[teiid-commits] teiid SVN: r3494 - in branches/7.4.x/engine/src: test/java/org/teiid/common/buffer and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Sep 14 13:57:13 EDT 2011


Author: shawkins
Date: 2011-09-14 13:57:13 -0400 (Wed, 14 Sep 2011)
New Revision: 3494

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
Log:
TEIID-1753 addressing compaction issues

Modified: branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-13 23:22:55 UTC (rev 3493)
+++ branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-14 17:57:13 UTC (rev 3494)
@@ -36,12 +36,14 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -95,7 +97,8 @@
 	private final class BatchManagerImpl implements BatchManager {
 		private final String id;
 		private volatile FileStore store;
-		private Map<Long, long[]> physicalMapping = new ConcurrentHashMap<Long, long[]>();
+		private Map<Long, long[]> physicalMapping = new HashMap<Long, long[]>();
+		private ConcurrentSkipListSet<Long> freed = new ConcurrentSkipListSet<Long>(); 
 		private ReadWriteLock compactionLock = new ReentrantReadWriteLock();
 		private AtomicLong unusedSpace = new AtomicLong();
 		private int[] lobIndexes;
@@ -109,6 +112,13 @@
 			this.sizeUtility = new SizeUtility();
 		}
 		
+		private void freeBatch(Long id) {
+			long[] info = physicalMapping.remove(id);
+			if (info != null) { 
+				unusedSpace.addAndGet(info[1]); 
+			}
+		}
+		
 		public FileStore createStorage(String prefix) {
 			return createFileStore(id+prefix);
 		}
@@ -123,7 +133,7 @@
 		}
 		
 		private boolean shouldCompact(long offset) {
-			return offset > COMPACTION_THRESHOLD && unusedSpace.get() * 4 > offset * 3;
+			return offset > compactionThreshold && unusedSpace.get() * 4 > offset * 3;
 		}
 		
 		private long getOffset() throws TeiidComponentException {
@@ -131,45 +141,38 @@
 			if (!shouldCompact(offset)) {
 				return offset;
 			}
-			try {
-				this.compactionLock.writeLock().lock();
-				offset = store.getLength();
-				//retest the condition to ensure that compaction is still needed
-				if (!shouldCompact(offset)) {
-					return offset;
+			offset = store.getLength();
+			FileStore newStore = createFileStore(id);
+			newStore.setCleanupReference(this);
+			byte[] buffer = new byte[IO_BUFFER_SIZE];
+			List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
+			Collections.sort(values, new Comparator<long[]>() {
+				@Override
+				public int compare(long[] o1, long[] o2) {
+					return Long.signum(o1[0] - o2[0]);
 				}
-				FileStore newStore = createFileStore(id);
-				newStore.setCleanupReference(this);
-				byte[] buffer = new byte[IO_BUFFER_SIZE];
-				List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
-				Collections.sort(values, new Comparator<long[]>() {
-					@Override
-					public int compare(long[] o1, long[] o2) {
-						return Long.signum(o1[0] - o2[0]);
-					}
-				});
-				for (long[] info : values) {
-					long oldOffset = info[0];
-					info[0] = newStore.getLength();
-					int size = (int)info[1];
-					while (size > 0) {
-						int toWrite = Math.min(IO_BUFFER_SIZE, size);
-						store.readFully(oldOffset, buffer, 0, toWrite);
-						newStore.write(buffer, 0, toWrite);
-						size -= toWrite;
-					}
+			});
+			for (long[] info : values) {
+				long oldOffset = info[0];
+				info[0] = newStore.getLength();
+				int size = (int)info[1];
+				while (size > 0) {
+					int toWrite = Math.min(IO_BUFFER_SIZE, size);
+					store.readFully(oldOffset, buffer, 0, toWrite);
+					newStore.write(buffer, 0, toWrite);
+					size -= toWrite;
+					oldOffset += toWrite;
 				}
-				store.remove();
-				store = newStore;
-				long oldOffset = offset;
-				offset = store.getLength();
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
-				}
-				return offset;
-			} finally {
-				this.compactionLock.writeLock().unlock();
 			}
+			store.remove();
+			store = newStore;
+			long oldOffset = offset;
+			offset = store.getLength();
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+			}
+			unusedSpace.set(0);
+			return offset;
 		}
 
 		@Override
@@ -298,7 +301,6 @@
 				try {
 					this.batchManager.compactionLock.readLock().lock();
 					long[] info = batchManager.physicalMapping.get(this.id);
-					Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
 					ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
 		            batch = new TupleBatch();
 		            batch.setDataTypes(types);
@@ -341,16 +343,20 @@
 								lobManager.updateReferences(batchManager.lobIndexes, tuple);
 							}
 						}
-						synchronized (batchManager.store) {
-							offset = batchManager.getOffset();
-							OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
-				            ObjectOutputStream oos = new ObjectOutputStream(fsos);
-				            batch.writeExternal(oos);
-				            oos.close();
-				            long size = batchManager.store.getLength() - offset;
-				            long[] info = new long[] {offset, size};
-				            batchManager.physicalMapping.put(this.id, info);
+						batchManager.compactionLock.writeLock().lock();
+						lockheld = true;
+						Long free = null;
+						while ((free = batchManager.freed.pollFirst()) != null) {
+							batchManager.freeBatch(free);
 						}
+						offset = batchManager.getOffset();
+						OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
+			            ObjectOutputStream oos = new ObjectOutputStream(fsos);
+			            batch.writeExternal(oos);
+			            oos.close();
+			            long size = batchManager.store.getLength() - offset;
+			            long[] info = new long[] {offset, size};
+			            batchManager.physicalMapping.put(this.id, info);
 						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 							LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
 						}
@@ -383,12 +389,17 @@
 					}
 				}
 			}
-			long[] info = batchManager.physicalMapping.remove(id);
-			if (info != null) {
-				batchManager.unusedSpace.addAndGet(info[1]); 
-			}
 			activeBatch = null;
 			batchReference = null;
+			if (batchManager.compactionLock.writeLock().tryLock()) {
+				try {
+					batchManager.freeBatch(id);
+				} finally {
+					batchManager.compactionLock.writeLock().unlock();
+				}
+			} else {
+				batchManager.freed.add(id);
+			}
 		}
 		
 		@Override
@@ -407,6 +418,7 @@
     private volatile int reserveBatchKB;
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
     private boolean useWeakReferences = true;
+    private int compactionThreshold = COMPACTION_THRESHOLD;
 
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
@@ -642,7 +654,7 @@
 			try {
 				mb.persist();
 			} catch (TeiidComponentException e) {
-				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
 			}
 		}
 	}
@@ -651,8 +663,6 @@
 	public int getSchemaSize(List<? extends Expression> elements) {
 		int total = 0;
 		boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
-		//we make a assumption that the average column size under 64bits is approximately 128bytes
-		//this includes alignment, row/array, and reference overhead
 		for (Expression element : elements) {
 			Class<?> type = element.getType();
 			total += SizeUtility.getSize(isValueCacheEnabled, type);
@@ -703,4 +713,8 @@
 		this.useWeakReferences = useWeakReferences;
 	}
 	
+	public void setCompactionThreshold(int compactionThreshold) {
+		this.compactionThreshold = compactionThreshold;
+	}
+	
 }

Modified: branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-13 23:22:55 UTC (rev 3493)
+++ branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-14 17:57:13 UTC (rev 3494)
@@ -100,5 +100,34 @@
 		}
 				
 	}
+	
+	/**
+	 * Forces the logic through several compaction cycles by using large strings
+	 * @throws TeiidComponentException
+	 */
+	@Test public void testCompaction() throws TeiidComponentException {
+		BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+		bm.setProcessorBatchSize(32);
+		bm.setMaxReserveKB(0);//force all to disk
+		bm.setCompactionThreshold(0);
+		bm.initialize();
+		
+		ElementSymbol e1 = new ElementSymbol("x");
+		e1.setType(String.class);
+		List<ElementSymbol> elements = Arrays.asList(e1);
+		STree map = bm.createSTree(elements, "1", 1);
+		
+		int size = 1000;
+		
+		for (int i = 0; i < size; i++) {
+			assertNull(map.insert(Arrays.asList(new String(new byte[1000])), InsertMode.ORDERED, size));
+			assertEquals(i + 1, map.getRowCount());
+		}
+		
+		for (int i = 0; i < size; i++) {
+			assertNotNull(map.remove(Arrays.asList(new String(new byte[1000]))));
+		}
+				
+	}
 
 }



More information about the teiid-commits mailing list