[teiid-commits] teiid SVN: r3509 - in trunk: engine/src/main/java/org/teiid/common/buffer and 2 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Sun Oct 2 23:03:18 EDT 2011
Author: shawkins
Date: 2011-10-02 23:03:18 -0400 (Sun, 02 Oct 2011)
New Revision: 3509
Added:
trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
Modified:
trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
Log:
TEIID-1750 switching to a lfru algorithm for caching and preserving the ordering value. also preventing test errors in TestSizeUtility. Adding a ResizableArayList to better shape batches in memory.
Modified: trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/BatchSerializer.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/client/src/main/java/org/teiid/client/BatchSerializer.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -50,7 +50,7 @@
private static ColumnSerializer defaultSerializer = new ColumnSerializer();
- private static final Map<String, ColumnSerializer> serializers = new HashMap<String, ColumnSerializer>();
+ private static final Map<String, ColumnSerializer> serializers = new HashMap<String, ColumnSerializer>(128);
static {
serializers.put(DataTypeManager.DefaultDataTypes.BIG_DECIMAL, new BigDecimalColumnSerializer());
serializers.put(DataTypeManager.DefaultDataTypes.BIG_INTEGER, new BigIntegerColumnSerializer());
@@ -394,7 +394,7 @@
return new ArrayList<List<Object>>(0);
} else if (rows > 0) {
int columns = in.readInt();
- List<List<Object>> batch = new ArrayList<List<Object>>(rows);
+ List<List<Object>> batch = new ResizingArrayList<List<Object>>(rows);
int numBytes = rows/8;
int extraRows = rows % 8;
for (int currentRow = 0; currentRow < rows; currentRow++) {
Added: trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java (rev 0)
+++ trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -0,0 +1,168 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.client;
+
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.RandomAccess;
+
+/**
+ * Modified {@link ArrayList} that resizes up and down by powers of 2.
+ *
+ * @param <T>
+ */
+ at SuppressWarnings("unchecked")
+public class ResizingArrayList<T> extends AbstractList<T> implements RandomAccess {
+
+ public static final int MIN_SHRINK_SIZE = 32;
+
+ protected Object[] elementData;
+ protected int size;
+
+ public ResizingArrayList() {
+ this(MIN_SHRINK_SIZE);
+ }
+
+ public ResizingArrayList(int initialCapacity) {
+ this.elementData = new Object[initialCapacity];
+ }
+
+ public ResizingArrayList(Collection<? extends T> c) {
+ elementData = c.toArray();
+ size = elementData.length;
+ // c.toArray might (incorrectly) not return Object[] (see 6260652)
+ if (elementData.getClass() != Object[].class)
+ elementData = Arrays.copyOf(elementData, size, Object[].class);
+ }
+
+ @Override
+ public T get(int index) {
+ rangeCheck(index, false);
+ return (T) elementData[index];
+ }
+
+ public void add(int index, T element) {
+ rangeCheck(index, true);
+ ensureCapacity(size+1);
+ System.arraycopy(elementData, index, elementData, index + 1,
+ size - index);
+ elementData[index] = element;
+ size++;
+ }
+
+ protected void ensureCapacity(int capacity) {
+ if (capacity <= elementData.length) {
+ return;
+ }
+ int newCapacity = 1 << (32 - Integer.numberOfLeadingZeros(capacity - 1));
+ int lowerCapacity = newCapacity*70/99; //SQRT(2)
+ if (lowerCapacity > capacity) {
+ newCapacity = lowerCapacity;
+ }
+ elementData = Arrays.copyOf(elementData, newCapacity);
+ }
+
+ public T set(int index, T element) {
+ rangeCheck(index, false);
+ T old = (T) elementData[index];
+ elementData[index] = element;
+ return old;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends T> c) {
+ return addAll(size, c);
+ }
+
+ @Override
+ public boolean addAll(int index, Collection<? extends T> c) {
+ rangeCheck(index, true);
+ int numNew = c.size();
+ ensureCapacity(size + numNew);
+ for (T t : c) {
+ elementData[index++] = t;
+ }
+ size += numNew;
+ return numNew != 0;
+ }
+
+ @Override
+ public T remove(int index) {
+ T oldValue = get(index);
+ int numMoved = size - index - 1;
+ if (numMoved > 0) {
+ System.arraycopy(elementData, index+1, elementData, index, numMoved);
+ }
+ elementData[--size] = null;
+ int halfLength = elementData.length/2;
+ if (size <= halfLength && elementData.length > MIN_SHRINK_SIZE) {
+ int newSize = Math.max(halfLength*99/70, MIN_SHRINK_SIZE);
+ Object[] next = new Object[newSize];
+ System.arraycopy(elementData, 0, next, 0, size);
+ elementData = next;
+ }
+ return oldValue;
+ }
+
+ private void rangeCheck(int index, boolean inclusive) {
+ if (index > size || (!inclusive && index == size)) {
+ throw new IndexOutOfBoundsException("Index: "+index+", Size: "+size); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
+
+ @Override
+ public void clear() {
+ if (size <= MIN_SHRINK_SIZE) {
+ for (int i = 0; i < size; i++) {
+ elementData[i] = null;
+ }
+ } else {
+ elementData = new Object[MIN_SHRINK_SIZE];
+ }
+ size = 0;
+ }
+
+ @Override
+ public Object[] toArray() {
+ return Arrays.copyOf(elementData, size);
+ }
+
+ public <U extends Object> U[] toArray(U[] a) {
+ if (a.length < size) {
+ return (U[]) Arrays.copyOf(elementData, size, a.getClass());
+ }
+ System.arraycopy(elementData, 0, a, 0, size);
+ if (a.length > size) {
+ a[size] = null;
+ }
+ return a;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+}
Property changes on: trunk/client/src/main/java/org/teiid/client/ResizingArrayList.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -40,7 +40,7 @@
boolean prefersMemory();
- Long createManagedBatch(List<? extends List<?>> batch) throws TeiidComponentException;
+ Long createManagedBatch(List<? extends List<?>> batch, Long previous, boolean removeOld) throws TeiidComponentException;
void remove();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -24,10 +24,12 @@
import java.lang.ref.WeakReference;
-public class CacheEntry {
+public class CacheEntry implements Comparable<CacheEntry>{
private boolean persistent;
private Object object;
private int sizeEstimate;
+ private long lastAccess;
+ private double orderingValue;
private WeakReference<? extends Serializer<?>> serializer;
private Long id;
@@ -52,6 +54,34 @@
this.sizeEstimate = sizeEstimate;
}
+ public long getLastAccess() {
+ return lastAccess;
+ }
+
+ public void setLastAccess(long lastAccess) {
+ this.lastAccess = lastAccess;
+ }
+
+ public double getOrderingValue() {
+ return orderingValue;
+ }
+
+ public void setOrderingValue(double orderingValue) {
+ this.orderingValue = orderingValue;
+ }
+
+ @Override
+ public int compareTo(CacheEntry o) {
+ int result = (int) Math.signum(orderingValue - o.orderingValue);
+ if (result == 0) {
+ result = Long.signum(lastAccess - o.lastAccess);
+ if (result == 0) {
+ return Long.signum(id - o.id);
+ }
+ }
+ return result;
+ }
+
public boolean equals(Object obj) {
if (obj == this) {
return true;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -34,6 +34,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.teiid.client.ResizingArrayList;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
@@ -103,9 +104,9 @@
this.stree = stree;
this.id = counter.getAndIncrement();
stree.pages.put(this.id, this);
- this.values = new ArrayList<List<?>>();
+ this.values = new ResizingArrayList<List<?>>();
if (!leaf) {
- children = new ArrayList<SPage>();
+ children = new ResizingArrayList<SPage>();
}
}
@@ -186,23 +187,27 @@
if (values instanceof LightWeightCopyOnWriteList<?>) {
values = ((LightWeightCopyOnWriteList<List<?>>)values).getList();
}
- if (managedBatch != null && trackingObject == null) {
- stree.getBatchManager(children == null).remove(managedBatch);
- managedBatch = null;
- trackingObject = null;
- }
if (values.size() < MIN_PERSISTENT_SIZE) {
- this.values = values;
+ setDirectValues(values);
return;
} else if (stree.batchInsert && children == null && values.size() < stree.leafSize) {
- this.values = values;
+ setDirectValues(values);
stree.incompleteInsert = this;
return;
}
this.values = null;
+ managedBatch = stree.getBatchManager(children == null).createManagedBatch(values, managedBatch, trackingObject == null);
this.trackingObject = null;
- managedBatch = stree.getBatchManager(children == null).createManagedBatch(values);
}
+
+ private void setDirectValues(List<List<?>> values) {
+ if (managedBatch != null && trackingObject == null) {
+ stree.getBatchManager(children == null).remove(managedBatch);
+ managedBatch = null;
+ trackingObject = null;
+ }
+ this.values = values;
+ }
protected void remove(boolean force) {
if (managedBatch != null) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -22,12 +22,12 @@
package org.teiid.common.buffer;
-import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
+import org.teiid.client.ResizingArrayList;
import org.teiid.common.buffer.LobManager.ReferenceMode;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.types.DataTypeManager;
@@ -69,7 +69,7 @@
private int rowCount;
private boolean isFinal;
private TreeMap<Integer, Long> batches = new TreeMap<Integer, Long>();
- private ArrayList<List<?>> batchBuffer;
+ private List<List<?>> batchBuffer;
private boolean removed;
private boolean forwardOnly;
@@ -111,7 +111,7 @@
}
this.rowCount++;
if (batchBuffer == null) {
- batchBuffer = new ArrayList<List<?>>(batchSize/4);
+ batchBuffer = new ResizingArrayList<List<?>>(batchSize/4);
}
batchBuffer.add(tuple);
if (batchBuffer.size() == batchSize) {
@@ -178,7 +178,7 @@
if (batchBuffer == null || batchBuffer.isEmpty() || (!force && batchBuffer.size() < Math.max(1, batchSize / 32))) {
return;
}
- Long mbatch = manager.createManagedBatch(batchBuffer);
+ Long mbatch = manager.createManagedBatch(batchBuffer, null, false);
this.batches.put(rowCount - batchBuffer.size() + 1, mbatch);
batchBuffer = null;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -32,7 +32,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -79,8 +78,8 @@
* - this is not necessary for already persistent batches, since we hold a weak reference
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
-
- private final class BatchManagerImpl implements BatchManager, Serializer<List<? extends List<?>>> {
+
+ final class BatchManagerImpl implements BatchManager, Serializer<List<? extends List<?>>> {
final Long id;
SizeUtility sizeUtility;
private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
@@ -131,7 +130,8 @@
}
@Override
- public Long createManagedBatch(List<? extends List<?>> batch)
+ public Long createManagedBatch(List<? extends List<?>> batch,
+ Long previous, boolean removeOld)
throws TeiidComponentException {
int sizeEstimate = getSizeEstimate(batch);
Long oid = batchAdded.getAndIncrement();
@@ -139,7 +139,20 @@
ce.setObject(batch);
ce.setSizeEstimate(sizeEstimate);
ce.setSerializer(this.ref);
- return addCacheEntry(ce, this);
+ CacheEntry old = null;
+ if (previous != null) {
+ if (removeOld) {
+ old = BufferManagerImpl.this.remove(id, previous, prefersMemory.get());
+ } else {
+ old = fastGet(previous, prefersMemory.get(), true);
+ }
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ cache.addToCacheGroup(id, ce.getId());
+ addMemoryEntry(ce, old);
+ return oid;
}
@Override
@@ -166,7 +179,7 @@
}
public int getSizeEstimate(List<? extends List<?>> obj) {
- return (int) Math.max(1, sizeUtility.getBatchSize(obj) / 1024);
+ return (int) Math.max(1, sizeUtility.getBatchSize(DataTypeManager.isValueCacheEnabled(), obj) / 1024);
}
@SuppressWarnings("unchecked")
@@ -201,7 +214,7 @@
ce.setSerializer(this.ref);
ce.setPersistent(true);
if (retain) {
- addMemoryEntry(ce);
+ addMemoryEntry(ce, null);
}
}
return (List<List<?>>)ce.getObject();
@@ -259,9 +272,32 @@
private AtomicInteger activeBatchKB = new AtomicInteger();
- //tiered memory entries. the first tier is just a queue of adds/gets. once accessed again, the entry moves to the tenured tier.
- private LinkedHashMap<Long, CacheEntry> memoryEntries = new LinkedHashMap<Long, CacheEntry>(16, .75f, false);
- private LinkedHashMap<Long, CacheEntry> tenuredMemoryEntries = new LinkedHashMap<Long, CacheEntry>(16, .75f, true);
+ //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher -> LRU
+ //TODO: adaptively adjust this value. more hits should move closer to lru
+ private final double crfLamda = .0002;
+ //implements a LRFU cache using the a customized crf function. we store the value with
+ //the cache entry to make a better decision about reuse of the batch
+ //TODO: consider the size estimate in the weighting function
+ private OrderedCache<Long, CacheEntry> memoryEntries = new OrderedCache<Long, CacheEntry>() {
+
+ @Override
+ protected void recordAccess(Long key, CacheEntry value, boolean initial) {
+ long lastAccess = value.getLastAccess();
+ value.setLastAccess(readAttempts.get());
+ if (initial && lastAccess == 0) {
+ return;
+ }
+ double orderingValue = value.getOrderingValue();
+ orderingValue =
+ //Frequency component
+ orderingValue*Math.pow(1-crfLamda, value.getLastAccess() - lastAccess)
+ //recency component
+ + Math.pow(value.getLastAccess(), crfLamda);
+ value.setOrderingValue(orderingValue);
+ }
+ };
+
+ //private LinkedHashMap<Long, CacheEntry> memoryEntries = new LinkedHashMap<Long, CacheEntry>();
//limited size reference caches based upon the memory settings
private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache;
@@ -510,38 +546,26 @@
if (memoryCount < getMaxReserveKB() / 8) {
DataTypeManager.setValueCacheEnabled(false);
}
- } else if (memoryCount > getMaxReserveKB() / 4) {
+ } else if (memoryCount > getMaxReserveKB() / 2) {
DataTypeManager.setValueCacheEnabled(true);
}
return;
}
- boolean first = true;
+ int maxToFree = Math.max(maxProcessingKB>>1, reserveBatchKB>>3);
+ int freed = 0;
while (true) {
CacheEntry ce = null;
synchronized (memoryEntries) {
- if (activeBatchKB.get() == 0 || activeBatchKB.get() < reserveBatchKB * .8) {
+ if (freed > maxToFree || activeBatchKB.get() == 0 || activeBatchKB.get() < reserveBatchKB * .8) {
break;
}
- if (first) { //let one entry per persist cycle loose its tenure. this helps us be more write avoident.
- first = false;
- if (!tenuredMemoryEntries.isEmpty()) {
- Iterator<Map.Entry<Long, CacheEntry>> iter = tenuredMemoryEntries.entrySet().iterator();
- Map.Entry<Long, CacheEntry> entry = iter.next();
- iter.remove();
- memoryEntries.put(entry.getKey(), entry.getValue());
- }
+ ce = memoryEntries.evict();
+ freed += ce.getSizeEstimate();
+ activeBatchKB.addAndGet(-ce.getSizeEstimate());
+ if (ce.isPersistent()) {
+ continue;
}
- LinkedHashMap<Long, CacheEntry> toDrain = memoryEntries;
- if (memoryEntries.isEmpty()) {
- toDrain = tenuredMemoryEntries;
- if (tenuredMemoryEntries.isEmpty()) {
- break;
- }
- }
- Iterator<CacheEntry> iter = toDrain.values().iterator();
- ce = iter.next();
- iter.remove();
- activeBatchKB.addAndGet(-ce.getSizeEstimate());
+ ce.setPersistent(true);
}
persist(ce);
}
@@ -552,18 +576,15 @@
if (s == null) {
return;
}
- if (!ce.isPersistent()) {
- long count = writeCount.incrementAndGet();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
- }
- try {
- cache.add(ce, s);
- } catch (Throwable e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
- }
- ce.setPersistent(true);
+ long count = writeCount.incrementAndGet();
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
}
+ try {
+ cache.add(ce, s);
+ } catch (Throwable e) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ }
if (s.useSoftCache()) {
createSoftReference(ce);
} else if (useWeakReferences) {
@@ -584,18 +605,9 @@
CacheEntry ce = null;
synchronized (memoryEntries) {
if (retain) {
- ce = tenuredMemoryEntries.get(batch);
- if (ce == null) {
- ce = memoryEntries.remove(batch);
- if (ce != null) {
- tenuredMemoryEntries.put(batch, ce);
- }
- }
+ ce = memoryEntries.get(batch);
} else {
- ce = tenuredMemoryEntries.remove(batch);
- if (ce == null) {
- ce = memoryEntries.remove(batch);
- }
+ ce = memoryEntries.remove(batch);
}
}
if (ce != null) {
@@ -621,7 +633,7 @@
if (ce != null && ce.getObject() != null) {
referenceHit.getAndIncrement();
if (retain) {
- addMemoryEntry(ce);
+ addMemoryEntry(ce, null);
} else {
BufferManagerImpl.this.remove(ce, false);
}
@@ -630,7 +642,7 @@
return null;
}
- void remove(Long gid, Long batch, boolean prefersMemory) {
+ CacheEntry remove(Long gid, Long batch, boolean prefersMemory) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Removing batch from BufferManager", batch); //$NON-NLS-1$
}
@@ -640,6 +652,7 @@
} else {
ce.nullOut();
}
+ return ce;
}
private void remove(CacheEntry ce, boolean inMemory) {
@@ -652,18 +665,12 @@
}
}
- Long addCacheEntry(CacheEntry ce, Serializer<?> s) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
- }
- cache.addToCacheGroup(s.getId(), ce.getId());
- addMemoryEntry(ce);
- return ce.getId();
- }
-
- void addMemoryEntry(CacheEntry ce) {
+ void addMemoryEntry(CacheEntry ce, CacheEntry previous) {
persistBatchReferences();
synchronized (memoryEntries) {
+ if (previous != null) {
+ ce.setOrderingValue(previous.getOrderingValue());
+ }
memoryEntries.put(ce.getId(), ce);
}
activeBatchKB.getAndAdd(ce.getSizeEstimate());
@@ -782,7 +789,7 @@
this.inlineLobs = inlineLobs;
}
- private int getMaxReserveKB() {
+ public int getMaxReserveKB() {
return maxReserveKB.get();
}
@@ -790,4 +797,12 @@
this.cache = cache;
}
+ public int getActiveBatchKB() {
+ return activeBatchKB.get();
+ }
+
+ public int getMemoryCacheEntries() {
+ return memoryEntries.size();
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -235,6 +235,8 @@
};
ObjectOutputStream oos = new ObjectOutputStream(fsos);
oos.writeInt(entry.getSizeEstimate());
+ oos.writeLong(entry.getLastAccess());
+ oos.writeDouble(entry.getOrderingValue());
s.serialize(entry.getObject(), oos);
oos.close();
long size = fsos.getBytesWritten();
@@ -270,6 +272,8 @@
ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(group.store.createInputStream(info[0]), IO_BUFFER_SIZE));
CacheEntry ce = new CacheEntry(id);
ce.setSizeEstimate(ois.readInt());
+ ce.setLastAccess(ois.readLong());
+ ce.setOrderingValue(ois.readDouble());
ce.setObject(serializer.deserialize(ois));
return ce;
} catch(IOException e) {
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+public abstract class OrderedCache<K, V> {
+
+ protected HashMap<K, V> map = new HashMap<K, V>();
+ protected TreeMap<V, K> expirationQueue;
+
+ public OrderedCache() {
+ expirationQueue = new TreeMap<V, K>();
+ }
+
+ public OrderedCache(Comparator<? super V> comparator) {
+ expirationQueue = new TreeMap<V, K>(comparator);
+ }
+
+ public V get(K key) {
+ V result = map.get(key);
+ if (result != null) {
+ expirationQueue.remove(result);
+ recordAccess(key, result, false);
+ expirationQueue.put(result, key);
+ }
+ return result;
+ }
+
+ public V remove(K key) {
+ V result = map.remove(key);
+ if (result != null) {
+ expirationQueue.remove(result);
+ }
+ return result;
+ }
+
+ public V put(K key, V value) {
+ V result = map.put(key, value);
+ if (result != null) {
+ expirationQueue.remove(result);
+ }
+ recordAccess(key, value, result == null);
+ expirationQueue.put(value, key);
+ return result;
+ }
+
+ public V evict() {
+ Map.Entry<V, K> entry = expirationQueue.pollFirstEntry();
+ if (entry == null) {
+ return null;
+ }
+ map.remove(entry.getValue());
+ return entry.getKey();
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ protected abstract void recordAccess(K key, V value, boolean initial);
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -74,11 +74,7 @@
this.types = types;
}
- public long getBatchSize(List<? extends List<?>> data) {
- return getBatchSize(DataTypeManager.isValueCacheEnabled(), data);
- }
-
- private long getBatchSize(boolean accountForValueCache, List<? extends List<?>> data) {
+ public long getBatchSize(boolean accountForValueCache, List<? extends List<?>> data) {
int colLength = types.length;
int rowLength = data.size();
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java 2011-09-30 02:00:28 UTC (rev 3508)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java 2011-10-03 03:03:18 UTC (rev 3509)
@@ -163,7 +163,7 @@
String[] types = {"string", "integer", "boolean", "double", "string", "integer"}; //$NON-NLS-1$//$NON-NLS-2$//$NON-NLS-3$//$NON-NLS-4$ //$NON-NLS-5$//$NON-NLS-6$
- long actualSize = new SizeUtility(types).getBatchSize(Arrays.asList(expected));
+ long actualSize = new SizeUtility(types).getBatchSize(false, Arrays.asList(expected));
assertEquals("Got unexpected size: ", 2667, actualSize); //$NON-NLS-1$
}
More information about the teiid-commits
mailing list