[teiid-commits] teiid SVN: r3571 - in trunk: build/kits/jboss-container/deploy/teiid and 6 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Oct 19 21:51:51 EDT 2011


Author: shawkins
Date: 2011-10-19 21:51:50 -0400 (Wed, 19 Oct 2011)
New Revision: 3571

Modified:
   trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
   trunk/build/kits/jboss-container/teiid-releasenotes.html
   trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
   trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
   trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
   trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
   trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1750 lowering batch sizes, removing the small byte buffer slices in favor of absolute positioning

Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-10-20 01:51:50 UTC (rev 3571)
@@ -24,10 +24,10 @@
         <property name="useDisk">true</property>
         <!-- Directory location for the buffer files -->
         <property name="diskDirectory">${jboss.server.temp.dir}/teiid</property>
-        <!-- The max row count of a batch sent internally within the query processor. Should be <= the connectorBatchSize. (default 512) -->
-        <property name="processorBatchSize">512</property>
-        <!-- The max row count of a batch from a connector. Should be even multiple of processorBatchSize. (default 1024) -->
-        <property name="connectorBatchSize">1024</property>
+        <!-- The max row count of a batch sent internally within the query processor. Should be <= the connectorBatchSize. (default 256) -->
+        <property name="processorBatchSize">256</property>
+        <!-- The max row count of a batch from a connector. Should be even multiple of processorBatchSize. (default 512) -->
+        <property name="connectorBatchSize">512</property>
         <!-- 
             The approximate amount of buffer memory in kilobytes allowable for a single processing operation (sort, grouping, etc.) regardless of existing memory commitments. -1 means to automatically calculate a value (default -1).  
             See the admin guide for more.          

Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-10-20 01:51:50 UTC (rev 3571)
@@ -30,10 +30,10 @@
   <LI><B>File Enhancements</B> - the file translator can now optionally (via the ExceptionIfFileNotFound property) throw an exception if the path refers to a file that doesn't exist.  The file resource adapter can be configured to map file names and can prevent parent path .. references.  See the Admin Guide or the file-ds.xml template for more.
   <LI><B>TEXTTABLE Enhancements</B> - TEXTTABLE can now parse fixed width files that do not use a row delimiter and can optionally produce fixed values that haven't been trimmed.
   <LI><B>Temp table transactions</B> - Internal materialized views and temp table usage from a session and within procedures can take advantage of greater transaction support.
-  <LI><B>Buffering Improvements</B> - Added the ability to inline memory based or small lobs and added tracking of the memory held by soft references.  Also switched to a LFRU algorithm that significantly reduces writes and read misses with temporary tables.
+  <LI><B>Buffering Improvements</B> - Added the ability to inline memory based or small lobs and added tracking of the memory held by soft references.  Also switched to a concurrent LFRU algorithm that significantly reduces writes and read misses with temporary tables.  Added a memory buffer to better handle file storage.  
+  The memory buffer may be optional configured as off-heap for better large memory performance.
   <LI><B>GSSAPI</B> - both the Teiid JDBC client/server and the ODBC pg backend can now support GSSAPI for single sign-on. 
   <LI><B>Server-side Query Timeouts</B> - default query timeouts can be configured at both the VDB (via the query-timeout VDB property) and entire server (via the teiid-jboss-beans.xml queryTimeout property).
-  <LI><B>Memory Improvements</B> - buffering was optimized for concurrency and to better handle table querying instead of tuple buffers.  Added a memory buffer to better handle file storage.  The memory buffer may be optional configured as off-heap for better large memory performance.
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>
@@ -119,6 +119,11 @@
 
 <h4>from 7.4</h4>
 <ul>
+  <li>The configuration for the buffer service now defaults to 256/512 for processor and connector batch sizes respectively.  The buffer service also has 4 new properties inlineLobs, memoryBufferSpace, memoryBufferOffHeap, and maxStorageObjectSize.
+</ul>
+
+<h4>from 7.4</h4>
+<ul>
   <LI>The configuration for authorization has been moved off of the RuntimeEngineDeployer bean and onto separate AuthorizationValidator and PolicyDecider beans.
   <LI>The configuration for the buffer manager has been simplified to refer to memory sizes in KB, rather than batch columns.
 </ul>

Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml	2011-10-20 01:51:50 UTC (rev 3571)
@@ -67,7 +67,7 @@
 			<title>Big Data/Memory</title>
 			<para>Usage of extremely large VM sizes and or datasets requires additional considerations.
 			Teiid has a non-negligible amount of overhead per batch/table page on the order of 100-200 bytes.  Depending on the data types involved each
-			full batch/table page will represent between 64 and 4096 rows.  If you are dealing with datasets with billions of rows and you run into OutOfMemory issues, consider increasing the processor
+			full batch/table page will represent a variable number of rows (a power of two multiple above or below the processor batch size).  If you are dealing with datasets with billions of rows and you run into OutOfMemory issues, consider increasing the processor
 			batch size in the &jboss-beans; file to force the allocation of larger batches and table pages.  If the processor batch size is increased and/or you are dealing with extremely wide result sets (several hundred columns),
 			then the default setting of 8MB for the maxStorageObjectSize in the &jboss-beans; file may be too low.  The sizing for maxStorageObjectSize is terms of serialized size, which will be much
 			closer to the raw data size then the Java memory footprint estimation used for maxReservedKB.  

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -61,8 +61,8 @@
 		NO_WAIT
 	}
 
-	public static int DEFAULT_CONNECTOR_BATCH_SIZE = 1024;
-	public static int DEFAULT_PROCESSOR_BATCH_SIZE = 512;
+	public static int DEFAULT_CONNECTOR_BATCH_SIZE = 512;
+	public static int DEFAULT_PROCESSOR_BATCH_SIZE = 256;
 	public static int DEFAULT_MAX_PROCESSING_KB = -1;
 	public static int DEFAULT_RESERVE_BUFFER_KB = -1;
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -49,8 +49,6 @@
 @SuppressWarnings("unchecked")
 class SPage implements Cloneable {
 	
-	static final int MIN_PERSISTENT_SIZE = 16;
-
 	static class SearchResult {
 		int index;
 		SPage page;
@@ -186,7 +184,7 @@
 		if (values instanceof LightWeightCopyOnWriteList<?>) {
 			values = ((LightWeightCopyOnWriteList<List<?>>)values).getList();
 		}
-		if (values.size() < MIN_PERSISTENT_SIZE) {
+		if (values.size() < stree.minPageSize) {
 			setDirectValues(values);
 			return;
 		} else if (stree.batchInsert && children == null && values.size() < stree.leafSize) {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -65,6 +65,7 @@
     protected ListNestedSortComparator comparator;
     private int pageSize;
     protected int leafSize;
+    protected int minPageSize;
     protected int keyLength;
 	protected boolean batchInsert;
 	protected SPage incompleteInsert;
@@ -86,7 +87,7 @@
 		manager.setPrefersMemory(true);
 		this.leafManager = leafManager;
 		this.comparator = comparator;
-		this.pageSize = Math.max(pageSize, SPage.MIN_PERSISTENT_SIZE);
+		this.pageSize = pageSize;
 		pageSize >>>= 3;
 		while (pageSize > 0) {
 			pageSize >>>= 1;
@@ -97,6 +98,7 @@
 		this.leafSize = leafSize;
 		this.keyLength = keyLength;
 		this.lobManager = lobManager;
+		this.minPageSize = this.pageSize>>5;
 	}
 	
 	public STree clone() {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -42,22 +42,12 @@
 		}
 	}
 
-	private static class BlockInfo {
-		final ByteBuffer bb;
-		final int block;
-		public BlockInfo(ByteBuffer bb, int block) {
-			this.bb = bb;
-			this.block = block;
-		}
-	}
-	
 	private int blockAddressBits;
 	private int segmentAddressBits;
 	private int segmentSize;
 	private int blockSize;
 	private int blockCount;
 	private ThreadLocal<ByteBuffer>[] buffers;
-	private BlockInfo[] bufferCache;
 	
 	/**
 	 * Creates a new {@link BlockByteBuffer} where each buffer segment will be
@@ -88,8 +78,6 @@
 		if (lastSegmentSize > 0) {
 			buffers[fullSegments] = new ThreadLocalByteBuffer(allocate(lastSegmentSize, direct));
 		}
-		int logSize = 32 - Integer.numberOfLeadingZeros(blockCount);
-		bufferCache = new BlockInfo[Math.min(logSize, 20)];
 	}
 
 	public static ByteBuffer allocate(int size, boolean direct) {
@@ -106,7 +94,7 @@
 	}
 	
 	/**
-	 * Return a buffer containing the given start byte.
+	 * Return a buffer positioned at the given start byte.
 	 * It is assumed that the caller will handle blocks in
 	 * a thread safe manner.
 	 * @param startIndex
@@ -116,21 +104,12 @@
 		if (block < 0 || block >= blockCount) {
 			throw new IndexOutOfBoundsException("Invalid block " + block); //$NON-NLS-1$
 		}
-		int cacheIndex = block&(bufferCache.length -1);
-		BlockInfo info = bufferCache[cacheIndex];
-		if (info != null && info.block == block) {
-			info.bb.rewind();
-			return info.bb;
-		}
 		int segment = block>>(segmentAddressBits-blockAddressBits);
 		ByteBuffer bb = buffers[segment].get();
-		bb.limit(bb.capacity());
+		bb.rewind();
 		int position = (block<<blockAddressBits)&(segmentSize-1);
-		bb.position(position);
 		bb.limit(position + blockSize);
-		bb = bb.slice();
-		info = new BlockInfo(bb, block);
-		bufferCache[cacheIndex] = info;
+		bb.position(position);
 		return bb;
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -53,7 +53,6 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
 import org.teiid.core.util.ExecutorUtils;
-import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.logging.LogConstants;
 import org.teiid.logging.LogManager;
 import org.teiid.logging.MessageLevel;
@@ -166,27 +165,27 @@
 			if (index >= MAX_INDIRECT) {
 				position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
 				ByteBuffer next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT, value, mode);
-				if (next != info) {
+				if (next != null) {
 					info = next;
 					//should have traversed to the secondary
 					int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
-					position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
-					if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_SIZE) {
+					position = info.position() + indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
+					if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < info.limit()) {
 						info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
 					}
 					next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT + indirectAddressBlock * ADDRESSES_PER_BLOCK,  value, mode);
-					if (next != info) {
+					if (next != null) {
 						info = next;
-						position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
+						position = info.position() + ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
 					}
 				}
 			} else if (index >= DIRECT_POINTERS) {
 				//indirect
 				position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
 				ByteBuffer next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS, value, mode);
-				if (next != info) {
+				if (next != null) {
 					info = next;
-					position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
+					position = next.position() + (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
 				}
 			} else {
 				position = BYTES_PER_BLOCK_ADDRESS*index;
@@ -194,7 +193,7 @@
 			if (mode == Mode.ALLOCATE) {
 				dataBlock = nextBlock(true);
 				info.putInt(position, dataBlock);
-				if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_SIZE) {
+				if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < info.limit()) {
 					//maintain the invariant that the next pointer is empty
 					info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
 				}
@@ -215,7 +214,7 @@
 					buf.putInt(position, sib_index);
 				} else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
 					freeDataBlock(sib_index);
-					return buf;
+					return null;
 				}
 			}
 			return blockByteBuffer.getByteBuffer(sib_index);
@@ -273,7 +272,7 @@
 					ByteBuffer bb = getInodeBlock();
 					bb.putInt(EMPTY_ADDRESS);
 				}
-				inodeBuffer = inodeByteBuffer.getByteBuffer(inode);
+				inodeBuffer = inodeByteBuffer.getByteBuffer(inode).slice();
 			}
 			return inodeBuffer;
 		}
@@ -299,7 +298,7 @@
 			if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
 				return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
 			}
-			bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock);
+			bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock).slice();
 			freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
 			freeDataBlock(doublyIndirectIndexBlock);
 			return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
@@ -307,7 +306,7 @@
 
 		private boolean freeIndirectBlock(int indirectIndexBlock) {
 			ByteBuffer bb = blockByteBuffer.getByteBuffer(indirectIndexBlock);
-			boolean freedAll = freeBlock(0, bb, ADDRESSES_PER_BLOCK, true);
+			boolean freedAll = freeBlock(bb.position(), bb, ADDRESSES_PER_BLOCK, true);
 			freeDataBlock(indirectIndexBlock);
 			return freedAll;
 		}
@@ -409,7 +408,7 @@
 			if ((size>>1) >= maxStorageObjectSize) {
 				size>>=1;  //adjust the last block size if needed
 			}
-			stores.add(new BlockStore(this.storageManager, size, 30, BufferManagerImpl.CONCURRENCY_LEVEL>>2));
+			stores.add(new BlockStore(this.storageManager, size, 15, BufferManagerImpl.CONCURRENCY_LEVEL>>2));
 			size <<=2;
 		} while (size>>2 < maxStorageObjectSize);
 		this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
@@ -449,20 +448,22 @@
 						if (!map.containsKey(entry.getId())) {
 							return true; //already removed
 						}
-						info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS);
+						info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS, (int)readAttempts.get());
+						info.adding = true;
 						map.put(entry.getId(), info);
 					}
 				}
 			}
 			if (!newEntry) {
 				synchronized (info) {
-					if (info.inode == EMPTY_ADDRESS && info.block == EMPTY_ADDRESS) {
+					if (info.adding) {
 						return false; //someone else is responsible for adding this cache entry
 					}
 					if (info.evicting || info.inode != EMPTY_ADDRESS
 							|| !shouldPlaceInMemoryBuffer(0, info)) {
 						return true; //safe to remove from tier 1 
 					}
+					info.adding = true;
 					//second chance re-add to the cache, we assume that serialization would be faster than a disk read
 				}
 			}
@@ -492,19 +493,35 @@
             synchronized (map) {
             	if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
         			synchronized (info) {
+        				//set the size first, since it may raise an exceptional condition
+            			info.setSize(bos.getBytesWritten());
             			info.inode = blockManager.getInode();
-            			info.setSize(bos.getBytesWritten());
-                		memoryBufferEntries.touch(info, newEntry);
+        				memoryBufferEntries.add(info);
 					}
             		success = true;
             	}
 			}
 		} catch (Throwable e) {
-			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ entry.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+			if (e == PhysicalInfo.sizeChanged) {
+				//System.out.println("size changed " + info.inode + " " + info.block + " " + info);
+				//entries are mutable after adding, the original should be removed shortly so just ignore
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId() +" changed size since first persistence, keeping the original."); //$NON-NLS-1$ //$NON-NLS-2$
+			} else {
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts to read "+ entry.getId() +" later will result in an exception."); //$NON-NLS-1$ //$NON-NLS-2$
+			}
 		} finally {
 			if (hasPermit) {
 				memoryWritePermits.release();
 			}
+			if (info != null) {
+				synchronized (info) {
+					info.adding = false;
+					if (!success && blockManager != null) {
+						//invalidate for safety
+						info.inode = EMPTY_ADDRESS;
+					}
+				}
+			}
 			if (!success && blockManager != null) {
 				blockManager.free(false);
 			}
@@ -558,23 +575,21 @@
 		if (serializer == null) {
 			return null;
 		}
-		long currentTime = readAttempts.incrementAndGet();
 		InputStream is = null;
-		boolean inStorage = false;
 		try {
 			synchronized (info) {
 				assert !info.pinned && info.loading; //load should be locked
 				await(info, true, false); //not necessary, but should make things safer
 				if (info.inode != EMPTY_ADDRESS) {
 					info.pinned = true;
-					memoryBufferEntries.touch(info, false); 
+					memoryBufferEntries.touch(info); 
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
 					is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
 				} else if (info.block != EMPTY_ADDRESS) {
-					inStorage = true;
+					memoryBufferEntries.recordAccess(info);
 					storageReads.incrementAndGet();
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
@@ -587,28 +602,6 @@
 					return null;
 				}
 			}
-			if (inStorage && shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
-				BlockManager manager = null;
-				boolean success = false;
-				try {
-					manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
-					ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
-		            ObjectConverterUtil.write(os, is, -1);
-		            synchronized (info) {
-		            	assert !info.pinned;
-			            info.inode = manager.getInode();
-			            info.pinned = true;
-						memoryBufferEntries.touch(info, false);
-						is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
-					}
-					success = true;
-				} finally {
-					this.memoryWritePermits.release();
-					if (!success && manager != null) {
-						manager.free(false);
-					}
-				}
-			}
 			ObjectInputStream ois = new ObjectInputStream(is);
 			CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), ois.readInt(), serializer.deserialize(ois), ref, true);
 			return ce;
@@ -765,6 +758,9 @@
 				}
 				if (block != EMPTY_ADDRESS) {
 					if (demote) {
+						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+							LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Assigning storage data block", block, "of size", sizeBasedStores[info.sizeIndex].blockSize); //$NON-NLS-1$ //$NON-NLS-2$
+						}
 						info.block = block;
 					} else {
 						BlockStore blockStore = sizeBasedStores[info.sizeIndex];
@@ -903,6 +899,9 @@
  * Currently should be 48 bytes.
  */
 final class PhysicalInfo extends BaseCacheEntry {
+	
+	static final Exception sizeChanged = new Exception();  
+	
 	final Long gid;
 	//the memory inode and block count
 	int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
@@ -914,20 +913,26 @@
 	boolean pinned; //indicates that the entry is being read
 	boolean evicting; //indicates that the entry will be moved out of the memory buffer
 	boolean loading; //used by tier 1 cache to prevent double loads
+	boolean adding; //used to prevent double adds
 	
-	public PhysicalInfo(Long gid, Long id, int inode) {
-		super(new CacheKey(id, 0, 0));
+	public PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
+		super(new CacheKey(id, lastAccess, 0));
 		this.inode = inode;
 		this.gid = gid;
 	}
 	
-	public void setSize(int size) {
-		this.memoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
-		int blocks = memoryBlockCount;
-		this.sizeIndex = 0;
-		while (blocks >= 1) {
+	public void setSize(int size) throws Exception {
+		int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+		if (this.memoryBlockCount != 0) {
+			if (newMemoryBlockCount != memoryBlockCount) {
+				throw sizeChanged; 
+			}
+			return; //no changes
+		}
+		this.memoryBlockCount = newMemoryBlockCount;
+		while (newMemoryBlockCount >= 1) {
 			this.sizeIndex++;
-			blocks>>=2;
+			newMemoryBlockCount>>=2;
 		}
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -203,13 +203,13 @@
 					old = fastGet(previous, prefersMemory.get(), true);
 				}
 			}
-			CacheKey key = new CacheKey(oid, 0, old!=null?old.getKey().getOrderingValue():0);
+			CacheKey key = new CacheKey(oid, (int)readAttempts.get(), old!=null?old.getKey().getOrderingValue():0);
 			CacheEntry ce = new CacheEntry(key, sizeEstimate, batch, this.ref, false);
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 			cache.addToCacheGroup(id, ce.getId());
-			addMemoryEntry(ce);
+			addMemoryEntry(ce, true);
 			return oid;
 		}
 
@@ -285,7 +285,7 @@
 					cache.remove(this.id, batch);
 				}
 				if (retain) {
-					addMemoryEntry(ce);
+					addMemoryEntry(ce, false);
 				}
 			} finally {
 				cache.unlockForLoad(o);
@@ -693,8 +693,10 @@
 			if (ce == null) {
 				break;
 			}
-			if (!memoryEntries.containsKey(ce.getId())) {
-				continue; //not currently a valid eviction
+			synchronized (ce) {
+				if (!memoryEntries.containsKey(ce.getId())) {
+					continue; //not currently a valid eviction
+				}
 			}
 			boolean evicted = true;
 			try {
@@ -763,7 +765,7 @@
 					//there is a minute chance the batch was evicted
 					//this call ensures that we won't leak
 					if (memoryEntries.containsKey(batch)) {
-						evictionQueue.touch(ce, false);
+						evictionQueue.touch(ce);
 					}
 				} else {
 					evictionQueue.remove(ce);
@@ -791,7 +793,7 @@
 		if (ce != null && ce.getObject() != null) {
 			referenceHit.getAndIncrement();
 			if (retain) {
-				addMemoryEntry(ce);
+				addMemoryEntry(ce, false);
 			} else {
 				BufferManagerImpl.this.remove(ce, false);
 			}
@@ -823,11 +825,15 @@
 		}
 	}
 	
-	void addMemoryEntry(CacheEntry ce) {
+	void addMemoryEntry(CacheEntry ce, boolean initial) {
 		persistBatchReferences();
 		synchronized (ce) {
 			memoryEntries.put(ce.getId(), ce);
-			evictionQueue.touch(ce, true);
+			if (initial) {
+				evictionQueue.add(ce);
+			} else {
+				evictionQueue.touch(ce);
+			}
 		}
 		activeBatchKB.getAndAdd(ce.getSizeEstimate());
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -30,6 +30,7 @@
 	
     protected ByteBuffer buf;
     protected int bytesWritten;
+    private int startPosition;
     
     public ExtensibleBufferedOutputStream() {
 	}
@@ -45,6 +46,7 @@
 	private void ensureBuffer() {
 		if (buf == null) {
     		buf = newBuffer();
+    		startPosition = buf.position();
     	}
 	}
 
@@ -63,8 +65,11 @@
     }
 
 	public void flush() throws IOException {
-		if (buf != null && buf.position() > 0) {
-			bytesWritten += flushDirect(buf.position());
+		if (buf != null) {
+			int bytes = buf.position() - startPosition;
+			if (bytes > 0) {
+				bytesWritten += flushDirect(bytes);
+			}
 		}
 		buf = null;
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -60,11 +60,13 @@
 		return evictionQueue.remove(value.getKey()) != null;
 	}
 	
-	public void touch(V value, boolean initial) {
-		if (!initial) {
-			initial = evictionQueue.remove(value.getKey()) == null;			
-		}
-		recordAccess(value, initial);
+	public boolean add(V value) {
+		return evictionQueue.put(value.getKey(), value) == null;
+	}
+	
+	public void touch(V value) {
+		evictionQueue.remove(value.getKey());
+		recordAccess(value);
 		evictionQueue.put(value.getKey(), value);
 	}
 		
@@ -85,14 +87,13 @@
 		return null;
 	}
 	
-	protected void recordAccess(V value, boolean initial) {
-		assert Thread.holdsLock(value);
+	/**
+     * Callers should be synchronized on value
+     */
+	public void recordAccess(V value) {
 		CacheKey key = value.getKey();
 		int lastAccess = key.getLastAccess();
 		long currentClock = clock.get();
-		if (initial && lastAccess == 0) {
-			return; //we just want to timestamp this as created and not give it an ordering value
-		}
 		float orderingValue = key.getOrderingValue();
 		orderingValue = computeNextOrderingValue(currentClock, lastAccess,
 				orderingValue);
@@ -128,7 +129,7 @@
 				break;
 			}
 		}
-		this.maxInterval = i-1;
+		this.maxInterval = 1<<(i-1);
 	}
 	
 }

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -90,6 +90,16 @@
 		ce = get(cache, 3l, s);
 		assertEquals(cacheObject, ce.getObject());
 		
+		//repeat the test to ensure proper cleanup
+		ce = new CacheEntry(4l);
+		cacheObject = Integer.valueOf(60000);
+		ce.setObject(cacheObject);
+		cache.addToCacheGroup(s.getId(), ce.getId());
+		cache.add(ce, s);
+		
+		ce = get(cache, 4l, s);
+		assertEquals(cacheObject, ce.getObject());
+		
 		cache.removeCacheGroup(1l);
 		
 		assertEquals(0, cache.getDataBlocksInUse());

Modified: trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -94,8 +94,8 @@
         svc.start();
         
         BufferManager mgr = svc.getBufferManager();
-        assertEquals(6570, mgr.getSchemaSize(schema));
-        assertEquals(256, mgr.getProcessorBatchSize(schema));
+        assertEquals(3285, mgr.getSchemaSize(schema));
+        assertEquals(128, mgr.getProcessorBatchSize(schema));
     }
     
 }

Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java	2011-10-20 01:51:50 UTC (rev 3571)
@@ -130,6 +130,7 @@
         	BufferServiceImpl bsi = new BufferServiceImpl();
         	bsi.setDiskDirectory(UnitTestUtil.getTestScratchPath());
         	this.dqp.setBufferService(bsi);
+        	bsi.start();
         }
         
         this.dqp.setCacheFactory(new DefaultCacheFactory());



More information about the teiid-commits mailing list