[teiid-commits] teiid SVN: r3573 - in trunk/engine/src: test/java/org/teiid/common/buffer/impl and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Oct 20 11:37:18 EDT 2011


Author: shawkins
Date: 2011-10-20 11:37:18 -0400 (Thu, 20 Oct 2011)
New Revision: 3573

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
Log:
TEIID-1750 fixing a bug with positional filestore writes and fixing size indexing.

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java	2011-10-20 15:37:18 UTC (rev 3573)
@@ -34,12 +34,10 @@
 	int blockIndex;
 	ByteBuffer buf;
 	boolean done;
-	private final boolean threadSafe;
 
-	BlockInputStream(BlockManager manager, int blockCount, boolean threadSafe) {
+	BlockInputStream(BlockManager manager, int blockCount) {
 		this.manager = manager;
 		this.maxBlock = blockCount;
-		this.threadSafe = threadSafe;
 	}
 
 	@Override
@@ -58,9 +56,6 @@
 				return;
 			}
 			buf = manager.getBlock(blockIndex++);
-			if (threadSafe) {
-				buf = buf.duplicate();
-			}
 		}
 	}
 

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-20 15:37:18 UTC (rev 3573)
@@ -405,12 +405,9 @@
 		List<BlockStore> stores = new ArrayList<BlockStore>();
 		int size = BLOCK_SIZE;
 		do {
-			if ((size>>1) >= maxStorageObjectSize) {
-				size>>=1;  //adjust the last block size if needed
-			}
-			stores.add(new BlockStore(this.storageManager, size, 15, BufferManagerImpl.CONCURRENCY_LEVEL>>2));
-			size <<=2;
-		} while (size>>2 < maxStorageObjectSize);
+			stores.add(new BlockStore(this.storageManager, size, 30, BufferManagerImpl.CONCURRENCY_LEVEL));
+			size <<=1;
+		} while ((size>>1) < maxStorageObjectSize);
 		this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
 	}
 	
@@ -503,7 +500,6 @@
 			}
 		} catch (Throwable e) {
 			if (e == PhysicalInfo.sizeChanged) {
-				//System.out.println("size changed " + info.inode + " " + info.block + " " + info);
 				//entries are mutable after adding, the original should be removed shortly so just ignore
 				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId() +" changed size since first persistence, keeping the original."); //$NON-NLS-1$ //$NON-NLS-2$
 			} else {
@@ -587,7 +583,7 @@
 						LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
-					is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
+					is = new BlockInputStream(manager, info.memoryBlockCount);
 				} else if (info.block != EMPTY_ADDRESS) {
 					memoryBufferEntries.recordAccess(info);
 					storageReads.incrementAndGet();
@@ -720,7 +716,7 @@
 		try {
 			if (demote && block == EMPTY_ADDRESS) {
 				storageWrites.getAndIncrement();
-				BlockInputStream is = new BlockInputStream(bm, memoryBlockCount, false); //we know this can always be single threaded
+				BlockInputStream is = new BlockInputStream(bm, memoryBlockCount); 
 				BlockStore blockStore = sizeBasedStores[sizeIndex];
 				block = getAndSetNextClearBit(blockStore);
 				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
@@ -930,9 +926,9 @@
 			return; //no changes
 		}
 		this.memoryBlockCount = newMemoryBlockCount;
-		while (newMemoryBlockCount >= 1) {
+		while (newMemoryBlockCount > 1) {
 			this.sizeIndex++;
-			newMemoryBlockCount>>=2;
+			newMemoryBlockCount = (newMemoryBlockCount>>1) + ((newMemoryBlockCount&0x01)==0?0:1);
 		}
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-20 15:37:18 UTC (rev 3573)
@@ -323,7 +323,7 @@
 		}
 	}
 
-	static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable
+	static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable since it is roughly the same as max active plans
 	private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
 	private static ReferenceQueue<CacheEntry> SOFT_QUEUE = new ReferenceQueue<CacheEntry>();
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java	2011-10-20 15:37:18 UTC (rev 3573)
@@ -180,7 +180,7 @@
 	}
 	
 	/**
-	 * Set to try to always allocate against the first available block in a segment.
+	 * Set to always allocate against the first available block in a segment.
 	 * @param compact
 	 */
 	public void setCompact(boolean compact) {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-10-20 15:37:18 UTC (rev 3573)
@@ -133,11 +133,10 @@
 	            if (bytesUsed > 0) {
 		    		long used = usedBufferSpace.addAndGet(bytesUsed);
 					if (used > maxBufferSpace) {
-						usedBufferSpace.addAndGet(-bytesUsed);
 						//TODO: trigger a compaction before this is thrown
 						throw new IOException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", maxBufferSpace)); //$NON-NLS-1$
 					}
-	            	fileAccess.setLength(bytesUsed);
+	            	fileAccess.setLength(newLength);
 	            	bytesUsed = 0;
 	            }
 	            fileAccess.seek(fileOffset);

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java	2011-10-20 15:37:18 UTC (rev 3573)
@@ -188,4 +188,26 @@
 		return fsc;
 	}
 	
+	@Test public void testSizeIndex() throws Exception {
+		PhysicalInfo info = new PhysicalInfo(1l, 1l, -1, 0);
+		info.setSize(1<<13);
+		assertEquals(0, info.sizeIndex);
+		
+		info = new PhysicalInfo(1l, 1l, -1, 0);
+		info.setSize(1 + (1<<13));
+		assertEquals(1, info.sizeIndex);
+
+		info = new PhysicalInfo(1l, 1l, -1, 0);
+		info.setSize(2 + (1<<15));
+		assertEquals(3, info.sizeIndex);
+	}
+	
+	@Test(expected=Exception.class) public void testSizeChanged() throws Exception {
+		PhysicalInfo info = new PhysicalInfo(1l, 1l, -1, 0);
+		info.setSize(1<<13);
+		assertEquals(0, info.sizeIndex);
+		
+		info.setSize(1 + (1<<13));
+	}
+	
 }

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java	2011-10-20 15:37:18 UTC (rev 3573)
@@ -59,6 +59,25 @@
         store.remove();
         assertEquals(0, sm.getUsedBufferSpace());
     }
+    
+    @Test public void testPositionalWrite() throws Exception {
+        FileStorageManager sm = getStorageManager(null, null);        
+        String tsID = "0";     //$NON-NLS-1$
+        FileStore store = sm.createFileStore(tsID);
+        byte[] expectedBytes = writeBytes(store, 2048);
+        assertEquals(4096, sm.getUsedBufferSpace());
+        
+        writeBytes(store, 4096);
+        assertEquals(6144, sm.getUsedBufferSpace());
+        
+        byte[] bytesRead = new byte[2048];        
+        store.readFully(2048, bytesRead, 0, bytesRead.length);
+        
+        assertArrayEquals(expectedBytes, bytesRead);
+        
+        store.remove();
+        assertEquals(0, sm.getUsedBufferSpace());
+    }
             
     @Test(expected=IOException.class) public void testMaxSpace() throws Exception {
     	FileStorageManager sm = getStorageManager(null, null); 
@@ -81,15 +100,19 @@
 
     static Random r = new Random();
     
-	static void writeBytes(FileStore store)
+	static void writeBytes(FileStore store) throws IOException {
+		writeBytes(store, store.getLength());
+	}
+
+	static byte[] writeBytes(FileStore store, long start)
 			throws IOException {
 		byte[] bytes = new byte[2048];
         r.nextBytes(bytes);
-        long start = store.getLength(); 
-        store.write(bytes, 0, bytes.length);
+        store.write(start, bytes, 0, bytes.length);
         byte[] bytesRead = new byte[2048];        
         store.readFully(start, bytesRead, 0, bytesRead.length);
         assertTrue(Arrays.equals(bytes, bytesRead));
+        return bytes;
 	}
     
     @Test public void testWritingMultipleFiles() throws Exception {



More information about the teiid-commits mailing list