[teiid-commits] teiid SVN: r3509 - in trunk: engine/src/main/java/org/teiid/common/buffer and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sun Oct 2 23:03:18 EDT 2011


Author: shawkins
Date: 2011-10-02 23:03:18 -0400 (Sun, 02 Oct 2011)
New Revision: 3509

Added:
   trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
Modified:
   trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
   trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
Log:
TEIID-1750 switching to a lfru algorithm for caching and preserving the ordering value.  also preventing test errors in TestSizeUtility.  Adding a ResizableArayList to better shape batches in memory.

Modified: trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/BatchSerializer.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/client/src/main/java/org/teiid/client/BatchSerializer.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -50,7 +50,7 @@
     
     private static ColumnSerializer defaultSerializer = new ColumnSerializer();
     
-    private static final Map<String, ColumnSerializer> serializers = new HashMap<String, ColumnSerializer>();
+    private static final Map<String, ColumnSerializer> serializers = new HashMap<String, ColumnSerializer>(128);
     static {
         serializers.put(DataTypeManager.DefaultDataTypes.BIG_DECIMAL,   new BigDecimalColumnSerializer());
         serializers.put(DataTypeManager.DefaultDataTypes.BIG_INTEGER,   new BigIntegerColumnSerializer());
@@ -394,7 +394,7 @@
             return new ArrayList<List<Object>>(0);
         } else if (rows > 0) {
             int columns = in.readInt();
-            List<List<Object>> batch = new ArrayList<List<Object>>(rows);
+            List<List<Object>> batch = new ResizingArrayList<List<Object>>(rows);
             int numBytes = rows/8;
             int extraRows = rows % 8;
             for (int currentRow = 0; currentRow < rows; currentRow++) {

Added: trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java	                        (rev 0)
+++ trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -0,0 +1,168 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.client;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.RandomAccess;
+
+/**
+ * Modified {@link ArrayList} that resizes up and down by powers of 2.
+ * 
+ * @param <T>
+ */
+ at SuppressWarnings("unchecked")
+public class ResizingArrayList<T> extends AbstractList<T> implements RandomAccess {
+
+	public static final int MIN_SHRINK_SIZE = 32;
+	
+	protected Object[] elementData;
+	protected int size;
+	
+	public ResizingArrayList() {
+		this(MIN_SHRINK_SIZE);
+	}
+	
+	public ResizingArrayList(int initialCapacity) {
+		this.elementData = new Object[initialCapacity];
+	}
+	
+    public ResizingArrayList(Collection<? extends T> c) {
+    	elementData = c.toArray();
+    	size = elementData.length;
+    	// c.toArray might (incorrectly) not return Object[] (see 6260652)
+    	if (elementData.getClass() != Object[].class)
+    	    elementData = Arrays.copyOf(elementData, size, Object[].class);
+    }
+	
+	@Override
+	public T get(int index) {
+		rangeCheck(index, false);
+		return (T) elementData[index];
+	}
+	
+	public void add(int index, T element) {
+		rangeCheck(index, true);
+		ensureCapacity(size+1); 
+		System.arraycopy(elementData, index, elementData, index + 1,
+				 size - index);
+		elementData[index] = element;
+		size++;
+	}
+	
+	protected void ensureCapacity(int capacity) {
+		if (capacity <= elementData.length) {
+			return;
+		}
+	    int newCapacity = 1 << (32 - Integer.numberOfLeadingZeros(capacity - 1));
+	    int lowerCapacity = newCapacity*70/99; //SQRT(2)
+	    if (lowerCapacity > capacity) {
+	    	newCapacity = lowerCapacity;
+	    }
+        elementData = Arrays.copyOf(elementData, newCapacity);
+	}
+
+	public T set(int index, T element) {
+		rangeCheck(index, false);
+		T old = (T) elementData[index];
+		elementData[index] = element;
+		return old;
+	}
+	
+	@Override
+	public boolean addAll(Collection<? extends T> c) {
+		return addAll(size, c);
+	}
+
+	@Override
+	public boolean addAll(int index, Collection<? extends T> c) {
+		rangeCheck(index, true);
+        int numNew = c.size();
+        ensureCapacity(size + numNew);
+        for (T t : c) {
+			elementData[index++] = t;
+		}
+        size += numNew;
+        return numNew != 0;
+	}
+
+	@Override
+	public T remove(int index) {
+		T oldValue = get(index);
+		int numMoved = size - index - 1;
+		if (numMoved > 0) {
+		    System.arraycopy(elementData, index+1, elementData, index, numMoved);
+		}
+		elementData[--size] = null;
+		int halfLength = elementData.length/2;
+		if (size <= halfLength && elementData.length > MIN_SHRINK_SIZE) {
+			int newSize = Math.max(halfLength*99/70, MIN_SHRINK_SIZE);
+			Object[] next = new Object[newSize];
+		    System.arraycopy(elementData, 0, next, 0, size);
+		    elementData = next;
+		}
+		return oldValue;
+	}
+	
+    private void rangeCheck(int index, boolean inclusive) {
+		if (index > size || (!inclusive && index == size)) {
+		    throw new IndexOutOfBoundsException("Index: "+index+", Size: "+size); //$NON-NLS-1$ //$NON-NLS-2$
+		}
+    }
+	
+	@Override
+	public void clear() {
+		if (size <= MIN_SHRINK_SIZE) {
+			for (int i = 0; i < size; i++) {
+				elementData[i] = null;
+			}
+		} else {
+			elementData = new Object[MIN_SHRINK_SIZE];
+		}
+		size = 0;
+	}
+	
+	@Override
+	public Object[] toArray() {
+		return Arrays.copyOf(elementData, size);
+	}
+	
+	public <U extends Object> U[] toArray(U[] a) {
+		if (a.length < size) {
+			return (U[]) Arrays.copyOf(elementData, size, a.getClass());
+		}
+		System.arraycopy(elementData, 0, a, 0, size);
+        if (a.length > size) {
+            a[size] = null;
+        }
+        return a;
+	}
+
+	@Override
+	public int size() {
+		return size;
+	}
+
+}


Property changes on: trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -40,7 +40,7 @@
 	
 	boolean prefersMemory();
 	
-	Long createManagedBatch(List<? extends List<?>> batch) throws TeiidComponentException;
+	Long createManagedBatch(List<? extends List<?>> batch, Long previous, boolean removeOld) throws TeiidComponentException;
 	
 	void remove();
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -24,10 +24,12 @@
 
 import java.lang.ref.WeakReference;
 
-public class CacheEntry {
+public class CacheEntry implements Comparable<CacheEntry>{
 	private boolean persistent;
 	private Object object;
 	private int sizeEstimate;
+	private long lastAccess;
+	private double orderingValue;
 	private WeakReference<? extends Serializer<?>> serializer;
 	private Long id;
 	
@@ -52,6 +54,34 @@
 		this.sizeEstimate = sizeEstimate;
 	}
 	
+	public long getLastAccess() {
+		return lastAccess;
+	}
+	
+	public void setLastAccess(long lastAccess) {
+		this.lastAccess = lastAccess;
+	}
+	
+	public double getOrderingValue() {
+		return orderingValue;
+	}
+	
+	public void setOrderingValue(double orderingValue) {
+		this.orderingValue = orderingValue;
+	}
+	
+	@Override
+	public int compareTo(CacheEntry o) {
+		int result = (int) Math.signum(orderingValue - o.orderingValue);
+		if (result == 0) {
+			result = Long.signum(lastAccess - o.lastAccess);
+			if (result == 0) {
+				return Long.signum(id - o.id);
+			}
+		}
+		return result;
+	}
+	
 	public boolean equals(Object obj) {
 		if (obj == this) {
 			return true;

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -34,6 +34,7 @@
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.teiid.client.ResizingArrayList;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidRuntimeException;
 
@@ -103,9 +104,9 @@
 		this.stree = stree;
 		this.id = counter.getAndIncrement();
 		stree.pages.put(this.id, this);
-		this.values = new ArrayList<List<?>>();
+		this.values = new ResizingArrayList<List<?>>();
 		if (!leaf) {
-			children = new ArrayList<SPage>();
+			children = new ResizingArrayList<SPage>();
 		}
 	}
 	
@@ -186,23 +187,27 @@
 		if (values instanceof LightWeightCopyOnWriteList<?>) {
 			values = ((LightWeightCopyOnWriteList<List<?>>)values).getList();
 		}
-		if (managedBatch != null && trackingObject == null) {
-			stree.getBatchManager(children == null).remove(managedBatch);
-			managedBatch = null;
-			trackingObject = null;
-		}
 		if (values.size() < MIN_PERSISTENT_SIZE) {
-			this.values = values;
+			setDirectValues(values);
 			return;
 		} else if (stree.batchInsert && children == null && values.size() < stree.leafSize) {
-			this.values = values;
+			setDirectValues(values);
 			stree.incompleteInsert = this;
 			return;
 		}
 		this.values = null;
+		managedBatch = stree.getBatchManager(children == null).createManagedBatch(values, managedBatch, trackingObject == null);
 		this.trackingObject = null;
-		managedBatch = stree.getBatchManager(children == null).createManagedBatch(values);
 	}
+
+	private void setDirectValues(List<List<?>> values) {
+		if (managedBatch != null && trackingObject == null) {
+			stree.getBatchManager(children == null).remove(managedBatch);
+			managedBatch = null;
+			trackingObject = null;
+		}
+		this.values = values;
+	}
 	
 	protected void remove(boolean force) {
 		if (managedBatch != null) {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -22,12 +22,12 @@
 
 package org.teiid.common.buffer;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.teiid.client.ResizingArrayList;
 import org.teiid.common.buffer.LobManager.ReferenceMode;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.types.DataTypeManager;
@@ -69,7 +69,7 @@
 	private int rowCount;
 	private boolean isFinal;
     private TreeMap<Integer, Long> batches = new TreeMap<Integer, Long>();
-	private ArrayList<List<?>> batchBuffer;
+	private List<List<?>> batchBuffer;
 	private boolean removed;
 	private boolean forwardOnly;
 
@@ -111,7 +111,7 @@
 		}
 		this.rowCount++;
 		if (batchBuffer == null) {
-			batchBuffer = new ArrayList<List<?>>(batchSize/4);
+			batchBuffer = new ResizingArrayList<List<?>>(batchSize/4);
 		}
 		batchBuffer.add(tuple);
 		if (batchBuffer.size() == batchSize) {
@@ -178,7 +178,7 @@
 		if (batchBuffer == null || batchBuffer.isEmpty() || (!force && batchBuffer.size() < Math.max(1, batchSize / 32))) {
 			return;
 		}
-		Long mbatch = manager.createManagedBatch(batchBuffer);
+		Long mbatch = manager.createManagedBatch(batchBuffer, null, false);
 		this.batches.put(rowCount - batchBuffer.size() + 1, mbatch);
         batchBuffer = null;
 	}

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -32,7 +32,6 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -79,8 +78,8 @@
  *       - this is not necessary for already persistent batches, since we hold a weak reference
  */
 public class BufferManagerImpl implements BufferManager, StorageManager {
-	
-	private final class BatchManagerImpl implements BatchManager, Serializer<List<? extends List<?>>> {
+
+	final class BatchManagerImpl implements BatchManager, Serializer<List<? extends List<?>>> {
 		final Long id;
 		SizeUtility sizeUtility;
 		private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
@@ -131,7 +130,8 @@
 		}
 		
 		@Override
-		public Long createManagedBatch(List<? extends List<?>> batch)
+		public Long createManagedBatch(List<? extends List<?>> batch,
+				Long previous, boolean removeOld)
 				throws TeiidComponentException {
 			int sizeEstimate = getSizeEstimate(batch);
 			Long oid = batchAdded.getAndIncrement();
@@ -139,7 +139,20 @@
 			ce.setObject(batch);
 			ce.setSizeEstimate(sizeEstimate);
 			ce.setSerializer(this.ref);
-			return addCacheEntry(ce, this);
+			CacheEntry old = null;
+			if (previous != null) {
+				if (removeOld) {
+					old = BufferManagerImpl.this.remove(id, previous, prefersMemory.get());
+				} else {
+					old = fastGet(previous, prefersMemory.get(), true);
+				}
+			}
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
+			}
+			cache.addToCacheGroup(id, ce.getId());
+			addMemoryEntry(ce, old);
+			return oid;
 		}
 
 		@Override
@@ -166,7 +179,7 @@
 		}
 		
 		public int getSizeEstimate(List<? extends List<?>> obj) {
-			return (int) Math.max(1, sizeUtility.getBatchSize(obj) / 1024);
+			return (int) Math.max(1, sizeUtility.getBatchSize(DataTypeManager.isValueCacheEnabled(), obj) / 1024);
 		}
 		
 		@SuppressWarnings("unchecked")
@@ -201,7 +214,7 @@
 				ce.setSerializer(this.ref);
 				ce.setPersistent(true);
 				if (retain) {
-					addMemoryEntry(ce);
+					addMemoryEntry(ce, null);
 				}
 			}	
 			return (List<List<?>>)ce.getObject();
@@ -259,9 +272,32 @@
     
     private AtomicInteger activeBatchKB = new AtomicInteger();
     
-    //tiered memory entries.  the first tier is just a queue of adds/gets.  once accessed again, the entry moves to the tenured tier.
-    private LinkedHashMap<Long, CacheEntry> memoryEntries = new LinkedHashMap<Long, CacheEntry>(16, .75f, false);
-    private LinkedHashMap<Long, CacheEntry> tenuredMemoryEntries = new LinkedHashMap<Long, CacheEntry>(16, .75f, true);
+    //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher -> LRU
+    //TODO: adaptively adjust this value.  more hits should move closer to lru
+    private final double crfLamda = .0002;
+    //implements a LRFU cache using the a customized crf function.  we store the value with
+    //the cache entry to make a better decision about reuse of the batch
+    //TODO: consider the size estimate in the weighting function
+    private OrderedCache<Long, CacheEntry> memoryEntries = new OrderedCache<Long, CacheEntry>() {
+		
+		@Override
+		protected void recordAccess(Long key, CacheEntry value, boolean initial) {
+			long lastAccess = value.getLastAccess();
+			value.setLastAccess(readAttempts.get());
+			if (initial && lastAccess == 0) {
+				return;
+			}
+			double orderingValue = value.getOrderingValue();
+			orderingValue = 
+				//Frequency component
+				orderingValue*Math.pow(1-crfLamda, value.getLastAccess() - lastAccess)
+				//recency component
+				+ Math.pow(value.getLastAccess(), crfLamda);
+			value.setOrderingValue(orderingValue);
+		}
+	};
+	
+	//private LinkedHashMap<Long, CacheEntry> memoryEntries = new LinkedHashMap<Long, CacheEntry>();
     
     //limited size reference caches based upon the memory settings
     private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; 
@@ -510,38 +546,26 @@
     			if (memoryCount < getMaxReserveKB() / 8) {
 					DataTypeManager.setValueCacheEnabled(false);
 				}
-			} else if (memoryCount > getMaxReserveKB() / 4) {
+			} else if (memoryCount > getMaxReserveKB() / 2) {
 				DataTypeManager.setValueCacheEnabled(true);
 			}
 			return;
 		}
-		boolean first = true;
+		int maxToFree = Math.max(maxProcessingKB>>1, reserveBatchKB>>3);
+		int freed = 0;
 		while (true) {
 			CacheEntry ce = null;
 			synchronized (memoryEntries) {
-				if (activeBatchKB.get() == 0 || activeBatchKB.get() < reserveBatchKB * .8) {
+				if (freed > maxToFree || activeBatchKB.get() == 0 || activeBatchKB.get() < reserveBatchKB * .8) {
 					break;
 				}
-				if (first) { //let one entry per persist cycle loose its tenure.  this helps us be more write avoident.
-					first = false;
-					if (!tenuredMemoryEntries.isEmpty()) {
-						Iterator<Map.Entry<Long, CacheEntry>> iter = tenuredMemoryEntries.entrySet().iterator();
-						Map.Entry<Long, CacheEntry> entry = iter.next();
-						iter.remove();
-						memoryEntries.put(entry.getKey(), entry.getValue());
-					}
+				ce = memoryEntries.evict();
+				freed += ce.getSizeEstimate();
+				activeBatchKB.addAndGet(-ce.getSizeEstimate());
+				if (ce.isPersistent()) {
+					continue;
 				}
-				LinkedHashMap<Long, CacheEntry> toDrain = memoryEntries;
-				if (memoryEntries.isEmpty()) {
-					toDrain = tenuredMemoryEntries;
-					if (tenuredMemoryEntries.isEmpty()) {
-						break;
-					}
-				}
-				Iterator<CacheEntry> iter = toDrain.values().iterator();
-				ce = iter.next();
-				iter.remove();
-				activeBatchKB.addAndGet(-ce.getSizeEstimate());
+				ce.setPersistent(true);
 			}
 			persist(ce);
 		}
@@ -552,18 +576,15 @@
 		if (s == null) {
 			return;
 		}
-		if (!ce.isPersistent()) {
-			long count = writeCount.incrementAndGet();
-			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
-				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
-			}
-			try {
-				cache.add(ce, s);
-			} catch (Throwable e) {
-				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
-			}
-			ce.setPersistent(true);
+		long count = writeCount.incrementAndGet();
+		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+			LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
 		}
+		try {
+			cache.add(ce, s);
+		} catch (Throwable e) {
+			LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+		}
 		if (s.useSoftCache()) {
 			createSoftReference(ce);
 		} else if (useWeakReferences) {
@@ -584,18 +605,9 @@
 		CacheEntry ce = null;
 		synchronized (memoryEntries) {
 			if (retain) {
-				ce = tenuredMemoryEntries.get(batch);
-				if (ce == null) {
-					ce = memoryEntries.remove(batch);
-					if (ce != null) {
-						tenuredMemoryEntries.put(batch, ce);
-					}
-				} 
+				ce = memoryEntries.get(batch);
 			} else {
-				ce = tenuredMemoryEntries.remove(batch);
-				if (ce == null) {
-					ce = memoryEntries.remove(batch);
-				}
+				ce = memoryEntries.remove(batch);
 			}
 		}
 		if (ce != null) {
@@ -621,7 +633,7 @@
 		if (ce != null && ce.getObject() != null) {
 			referenceHit.getAndIncrement();
 			if (retain) {
-				addMemoryEntry(ce);
+				addMemoryEntry(ce, null);
 			} else {
 				BufferManagerImpl.this.remove(ce, false);
 			}
@@ -630,7 +642,7 @@
 		return null;
 	}
 	
-	void remove(Long gid, Long batch, boolean prefersMemory) {
+	CacheEntry remove(Long gid, Long batch, boolean prefersMemory) {
 		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 			LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Removing batch from BufferManager", batch); //$NON-NLS-1$
 		}
@@ -640,6 +652,7 @@
 		} else {
 			ce.nullOut();
 		}
+		return ce;
 	}
 
 	private void remove(CacheEntry ce, boolean inMemory) {
@@ -652,18 +665,12 @@
 		}
 	}
 	
-	Long addCacheEntry(CacheEntry ce, Serializer<?> s) {
-		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-			LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
-		}
-		cache.addToCacheGroup(s.getId(), ce.getId());
-		addMemoryEntry(ce);
-		return ce.getId();
-	}
-	
-	void addMemoryEntry(CacheEntry ce) {
+	void addMemoryEntry(CacheEntry ce, CacheEntry previous) {
 		persistBatchReferences();
 		synchronized (memoryEntries) {
+			if (previous != null) {
+				ce.setOrderingValue(previous.getOrderingValue());
+			}
 			memoryEntries.put(ce.getId(), ce);
 		}
 		activeBatchKB.getAndAdd(ce.getSizeEstimate());
@@ -782,7 +789,7 @@
 		this.inlineLobs = inlineLobs;
 	}
 
-	private int getMaxReserveKB() {
+	public int getMaxReserveKB() {
 		return maxReserveKB.get();
 	}
 	
@@ -790,4 +797,12 @@
 		this.cache = cache;
 	}
 	
+	public int getActiveBatchKB() {
+		return activeBatchKB.get();
+	}
+	
+	public int getMemoryCacheEntries() {
+		return memoryEntries.size();
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -235,6 +235,8 @@
 			};
 	        ObjectOutputStream oos = new ObjectOutputStream(fsos);
 	        oos.writeInt(entry.getSizeEstimate());
+	        oos.writeLong(entry.getLastAccess());
+	        oos.writeDouble(entry.getOrderingValue());
 	        s.serialize(entry.getObject(), oos);
 	        oos.close();
 	        long size = fsos.getBytesWritten();
@@ -270,6 +272,8 @@
 			ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(group.store.createInputStream(info[0]), IO_BUFFER_SIZE));
 			CacheEntry ce = new CacheEntry(id);
 			ce.setSizeEstimate(ois.readInt());
+			ce.setLastAccess(ois.readLong());
+			ce.setOrderingValue(ois.readDouble());
 			ce.setObject(serializer.deserialize(ois));
 			return ce;
         } catch(IOException e) {

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public abstract class OrderedCache<K, V> {
+	
+	protected HashMap<K, V> map = new HashMap<K, V>(); 
+	protected TreeMap<V, K> expirationQueue;
+	
+	public OrderedCache() {
+		expirationQueue = new TreeMap<V, K>();
+	}
+	
+	public OrderedCache(Comparator<? super V> comparator) {
+		expirationQueue = new TreeMap<V, K>(comparator);
+	}
+	
+	public V get(K key) {
+		V result = map.get(key);
+		if (result != null) {
+			expirationQueue.remove(result);
+			recordAccess(key, result, false);
+			expirationQueue.put(result, key);
+		}
+		return result;
+	}
+	
+	public V remove(K key) {
+		V result = map.remove(key);
+		if (result != null) {
+			expirationQueue.remove(result);
+		}
+		return result;
+	}
+	
+	public V put(K key, V value) {
+		V result = map.put(key, value);
+		if (result != null) {
+			expirationQueue.remove(result);
+		}
+		recordAccess(key, value, result == null);
+		expirationQueue.put(value, key);
+		return result;
+	}
+	
+	public V evict() {
+		Map.Entry<V, K> entry = expirationQueue.pollFirstEntry();
+		if (entry == null) {
+			return null;
+		}
+		map.remove(entry.getValue());
+		return entry.getKey();
+	}
+	
+	public int size() {
+		return map.size();
+	}
+	
+	protected abstract void recordAccess(K key, V value, boolean initial);
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -74,11 +74,7 @@
 		this.types = types;
 	}
 	
-    public long getBatchSize(List<? extends List<?>> data) {
-    	return getBatchSize(DataTypeManager.isValueCacheEnabled(), data);
-    }
-	
-    private long getBatchSize(boolean accountForValueCache, List<? extends List<?>> data) {
+    public long getBatchSize(boolean accountForValueCache, List<? extends List<?>> data) {
         int colLength = types.length;
         int rowLength = data.size();
     

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java	2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java	2011-10-03 03:03:18 UTC (rev 3509)
@@ -163,7 +163,7 @@
         
         String[] types = {"string", "integer", "boolean", "double", "string", "integer"};     //$NON-NLS-1$//$NON-NLS-2$//$NON-NLS-3$//$NON-NLS-4$ //$NON-NLS-5$//$NON-NLS-6$
 
-        long actualSize = new SizeUtility(types).getBatchSize(Arrays.asList(expected));
+        long actualSize = new SizeUtility(types).getBatchSize(false, Arrays.asList(expected));
         assertEquals("Got unexpected size: ", 2667, actualSize); //$NON-NLS-1$        
     }
     



More information about the teiid-commits mailing list