[teiid-commits] teiid SVN: r1788 - in trunk/engine/src/main/java/com/metamatrix/common/buffer: impl and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Jan 29 17:29:27 EST 2010


Author: shawkins
Date: 2010-01-29 17:29:27 -0500 (Fri, 29 Jan 2010)
New Revision: 1788

Modified:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
Log:
TEIID-913 updating the buffermanager to defer more writes.

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-29 20:42:18 UTC (rev 1787)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java	2010-01-29 22:29:27 UTC (rev 1788)
@@ -246,10 +246,7 @@
 	}
 	
 	public void close() throws MetaMatrixComponentException {
-		//if there is only a single batch, let it stay in memory
-		if (!this.batches.isEmpty()) { 
-			saveBatch(true, false);
-		}
+		saveBatch(true, false);
 		this.isFinal = true;
 	}
 	

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-01-29 20:42:18 UTC (rev 1787)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2010-01-29 22:29:27 UTC (rev 1788)
@@ -31,10 +31,13 @@
 import java.lang.ref.WeakReference;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -69,12 +72,37 @@
 
 /**
  * <p>Default implementation of BufferManager.</p>
- * Responsible for creating/tracking TupleBuffers and providing access to the StorageManager
+ * Responsible for creating/tracking TupleBuffers and providing access to the StorageManager.
+ * </p>
+ * The buffering strategy attempts to purge batches from the least recently used TupleBuffer
+ * from before (which wraps around circularly) the last used batch.  This attempts to compensate 
+ * for our tendency to read buffers in a forward manner.  If our processing algorithms are changed 
+ * to use alternating ascending/descending access, then the buffering approach could be replaced 
+ * with a simple LRU.
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
 	
 	private static final int IO_BUFFER_SIZE = 1 << 14;
 	
+	/**
+	 * Holder for active batches
+	 */
+	private class TupleBufferInfo {
+		TreeMap<Integer, ManagedBatchImpl> batches = new TreeMap<Integer, ManagedBatchImpl>();
+		Integer lastUsed = null;
+		
+		ManagedBatchImpl removeBatch(int row) {
+			ManagedBatchImpl result = batches.remove(row);
+			if (result != null) {
+				activeBatchCount--;
+				if (toPersistCount > 0) {
+					toPersistCount--;
+				}
+			}
+			return result;
+		}
+	}
+	
 	private final class ManagedBatchImpl implements ManagedBatch {
 		final private String id;
 		final private FileStore store;
@@ -83,23 +111,57 @@
 		private boolean persistent;
 		private volatile TupleBatch pBatch;
 		private Reference<TupleBatch> batchReference;
+		private int beginRow;
 		
 		public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) throws MetaMatrixComponentException {
             LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", batchAdded.incrementAndGet()); //$NON-NLS-1$
 			this.id = id;
 			this.store = store;
 			this.pBatch = batch;
-			if (batch.getBeginRow() == 1) {
-				activeBatches.add(this);
-			} else {
-				this.persist(false);
-			}
+			this.beginRow = batch.getBeginRow();
+			addToCache(false);
 			persistBatchReferences();
 		}
 
+		private void addToCache(boolean update) {
+			synchronized (activeBatches) {
+				activeBatchCount++;
+				TupleBufferInfo tbi = null;
+				if (update) {
+					tbi = activeBatches.remove(this.id);
+				} else {
+					tbi = activeBatches.get(this.id);
+				}
+				if (tbi == null) {
+					tbi = new TupleBufferInfo();
+					update = true;
+				} 
+				if (update) {
+					activeBatches.put(this.id, tbi);
+				}
+				Assertion.isNull(tbi.batches.put(this.beginRow, this));
+			}
+		}
+
 		@Override
 		public TupleBatch getBatch(boolean cache, String[] types) throws MetaMatrixComponentException {
-			readAttempts.getAndIncrement();
+			LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk", readAttempts.incrementAndGet(), "reference hits", referenceHit.get()); //$NON-NLS-1$ //$NON-NLS-2$
+			synchronized (activeBatches) {
+				TupleBufferInfo tbi = activeBatches.remove(this.id);
+				if (tbi != null) { 
+					boolean put = true;
+					if (!cache) {
+						tbi.removeBatch(this.beginRow);
+						if (tbi.batches.isEmpty()) {
+							put = false;
+						}
+					}
+					if (put) {
+						tbi.lastUsed = this.beginRow;
+						activeBatches.put(this.id, tbi);
+					}
+				}
+			}
 			synchronized (this) {
 				if (this.batchReference != null && this.pBatch == null) {
 					TupleBatch result = this.batchReference.get();
@@ -108,23 +170,19 @@
 							softCache.remove(this);
 							this.batchReference.clear();
 						} 
+						referenceHit.getAndIncrement();
 						return result;
 					}
 				}
 
 				TupleBatch batch = this.pBatch;
 				if (batch != null){
-					activeBatches.remove(this);
-					if (cache) {
-						activeBatches.add(this);
-					} 
 					return batch;
 				}
 			}
 			persistBatchReferences();
             LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk", readCount.incrementAndGet()); //$NON-NLS-1$
 			synchronized (this) {
-				//Resurrect from disk
 				try {
 		            ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(store.createInputStream(this.offset), IO_BUFFER_SIZE));
 		            TupleBatch batch = new TupleBatch();
@@ -133,7 +191,7 @@
 			        batch.setDataTypes(null);
 			        if (cache) {
 			        	this.pBatch = batch;
-			        	activeBatches.add(this);
+			        	addToCache(true);
 			        }
 					return batch;
 		        } catch(IOException e) {
@@ -145,19 +203,55 @@
 		}
 
 		public void persistBatchReferences() throws MetaMatrixComponentException {
-			persistOneBatch(softCache, reserveBatches * 2, false);
-			persistOneBatch(activeBatches, reserveBatches, true);
-		}
-
-		private void persistOneBatch(Set<ManagedBatchImpl> set, int requiredSize, boolean createSoft) throws MetaMatrixComponentException {
 			ManagedBatchImpl mb = null;
-			synchronized (set) {
-				if (set.size() > requiredSize) {
-					Iterator<ManagedBatchImpl> iter = set.iterator();
+			boolean createSoft = false;
+			/*
+			 * If we are over our limit, collect half of the batches. 
+			 */
+			synchronized (activeBatches) {
+				if (activeBatchCount > reserveBatches && toPersistCount == 0) {
+					toPersistCount = activeBatchCount / 2;
+				}
+			}
+			while (true) {
+				synchronized (activeBatches) {
+					if (activeBatchCount == 0 || toPersistCount == 0) {
+						toPersistCount = 0;
+						break;
+					}
+					Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
+					TupleBufferInfo tbi = iter.next();
+					Map.Entry<Integer, ManagedBatchImpl> entry = null;
+					if (tbi.lastUsed != null) {
+						entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
+					}
+					if (entry == null) {
+						entry = tbi.batches.pollLastEntry();
+					} else {
+						createSoft = true;
+						tbi.batches.remove(entry.getKey());
+					}
+					if (tbi.batches.isEmpty()) {
+						iter.remove();
+					}
+					activeBatchCount--;
+					toPersistCount--;
+					mb = entry.getValue();
+				}
+				persist(createSoft, mb);
+			}
+			synchronized (softCache) {
+				if (softCache.size() > reserveBatches) {
+					Iterator<ManagedBatchImpl> iter = softCache.iterator();
 					mb = iter.next();
 					iter.remove();
 				}
 			}
+			persist(false, mb);
+		}
+
+		private void persist(boolean createSoft, ManagedBatchImpl mb)
+				throws MetaMatrixComponentException {
 			try {
 				if (mb != null) {
 					mb.persist(createSoft);
@@ -202,7 +296,12 @@
 		}
 
 		public void remove() {
-			activeBatches.remove(this);
+			synchronized (activeBatches) {
+				TupleBufferInfo tbi = activeBatches.get(this.id);
+				if (tbi != null && tbi.removeBatch(this.beginRow) != null && tbi.batches.isEmpty()) {
+					activeBatches.remove(this.id);
+				}
+			}
 			softCache.remove(this);
 			pBatch = null;
 			if (batchReference != null) {
@@ -222,11 +321,13 @@
     private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
     private int reserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
     private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
-
+    
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
     
-	private Set<ManagedBatchImpl> activeBatches = Collections.synchronizedSet(new LinkedHashSet<ManagedBatchImpl>());
+    private int toPersistCount = 0;
+    private int activeBatchCount = 0;
+    private Map<String, TupleBufferInfo> activeBatches = new LinkedHashMap<String, TupleBufferInfo>();
 	private Set<ManagedBatchImpl> softCache = Collections.synchronizedSet(new LinkedHashSet<ManagedBatchImpl>());
     
     private StorageManager diskMgr;
@@ -236,6 +337,7 @@
     private AtomicInteger readCount = new AtomicInteger();
 	private AtomicInteger writeCount = new AtomicInteger();
 	private AtomicInteger readAttempts = new AtomicInteger();
+	private AtomicInteger referenceHit = new AtomicInteger();
 	
     public int getMaxProcessingBatches() {
 		return maxProcessingBatches;



More information about the teiid-commits mailing list