Author: shawkins
Date: 2010-01-29 17:29:27 -0500 (Fri, 29 Jan 2010)
New Revision: 1788
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
Log:
TEIID-913 updating the buffermanager to defer more writes.
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-29
20:42:18 UTC (rev 1787)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleBuffer.java 2010-01-29
22:29:27 UTC (rev 1788)
@@ -246,10 +246,7 @@
}
public void close() throws MetaMatrixComponentException {
- //if there is only a single batch, let it stay in memory
- if (!this.batches.isEmpty()) {
- saveBatch(true, false);
- }
+ saveBatch(true, false);
this.isFinal = true;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-01-29
20:42:18 UTC (rev 1787)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2010-01-29
22:29:27 UTC (rev 1788)
@@ -31,10 +31,13 @@
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -69,12 +72,37 @@
/**
* <p>Default implementation of BufferManager.</p>
- * Responsible for creating/tracking TupleBuffers and providing access to the
StorageManager
+ * Responsible for creating/tracking TupleBuffers and providing access to the
StorageManager.
+ * </p>
+ * The buffering strategy attempts to purge batches from the least recently used
TupleBuffer
+ * from before (which wraps around circularly) the last used batch. This attempts to
compensate
+ * for our tendency to read buffers in a forward manner. If our processing algorithms
are changed
+ * to use alternating ascending/descending access, then the buffering approach could be
replaced
+ * with a simple LRU.
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
private static final int IO_BUFFER_SIZE = 1 << 14;
+ /**
+ * Holder for active batches
+ */
+ private class TupleBufferInfo {
+ TreeMap<Integer, ManagedBatchImpl> batches = new TreeMap<Integer,
ManagedBatchImpl>();
+ Integer lastUsed = null;
+
+ ManagedBatchImpl removeBatch(int row) {
+ ManagedBatchImpl result = batches.remove(row);
+ if (result != null) {
+ activeBatchCount--;
+ if (toPersistCount > 0) {
+ toPersistCount--;
+ }
+ }
+ return result;
+ }
+ }
+
private final class ManagedBatchImpl implements ManagedBatch {
final private String id;
final private FileStore store;
@@ -83,23 +111,57 @@
private boolean persistent;
private volatile TupleBatch pBatch;
private Reference<TupleBatch> batchReference;
+ private int beginRow;
public ManagedBatchImpl(String id, FileStore store, TupleBatch batch) throws
MetaMatrixComponentException {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to
BufferManager", batchAdded.incrementAndGet()); //$NON-NLS-1$
this.id = id;
this.store = store;
this.pBatch = batch;
- if (batch.getBeginRow() == 1) {
- activeBatches.add(this);
- } else {
- this.persist(false);
- }
+ this.beginRow = batch.getBeginRow();
+ addToCache(false);
persistBatchReferences();
}
+ private void addToCache(boolean update) {
+ synchronized (activeBatches) {
+ activeBatchCount++;
+ TupleBufferInfo tbi = null;
+ if (update) {
+ tbi = activeBatches.remove(this.id);
+ } else {
+ tbi = activeBatches.get(this.id);
+ }
+ if (tbi == null) {
+ tbi = new TupleBufferInfo();
+ update = true;
+ }
+ if (update) {
+ activeBatches.put(this.id, tbi);
+ }
+ Assertion.isNull(tbi.batches.put(this.beginRow, this));
+ }
+ }
+
@Override
public TupleBatch getBatch(boolean cache, String[] types) throws
MetaMatrixComponentException {
- readAttempts.getAndIncrement();
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from disk",
readAttempts.incrementAndGet(), "reference hits", referenceHit.get());
//$NON-NLS-1$ //$NON-NLS-2$
+ synchronized (activeBatches) {
+ TupleBufferInfo tbi = activeBatches.remove(this.id);
+ if (tbi != null) {
+ boolean put = true;
+ if (!cache) {
+ tbi.removeBatch(this.beginRow);
+ if (tbi.batches.isEmpty()) {
+ put = false;
+ }
+ }
+ if (put) {
+ tbi.lastUsed = this.beginRow;
+ activeBatches.put(this.id, tbi);
+ }
+ }
+ }
synchronized (this) {
if (this.batchReference != null && this.pBatch == null) {
TupleBatch result = this.batchReference.get();
@@ -108,23 +170,19 @@
softCache.remove(this);
this.batchReference.clear();
}
+ referenceHit.getAndIncrement();
return result;
}
}
TupleBatch batch = this.pBatch;
if (batch != null){
- activeBatches.remove(this);
- if (cache) {
- activeBatches.add(this);
- }
return batch;
}
}
persistBatchReferences();
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reading batch from
disk", readCount.incrementAndGet()); //$NON-NLS-1$
synchronized (this) {
- //Resurrect from disk
try {
ObjectInputStream ois = new ObjectInputStream(new
BufferedInputStream(store.createInputStream(this.offset), IO_BUFFER_SIZE));
TupleBatch batch = new TupleBatch();
@@ -133,7 +191,7 @@
batch.setDataTypes(null);
if (cache) {
this.pBatch = batch;
- activeBatches.add(this);
+ addToCache(true);
}
return batch;
} catch(IOException e) {
@@ -145,19 +203,55 @@
}
public void persistBatchReferences() throws MetaMatrixComponentException {
- persistOneBatch(softCache, reserveBatches * 2, false);
- persistOneBatch(activeBatches, reserveBatches, true);
- }
-
- private void persistOneBatch(Set<ManagedBatchImpl> set, int requiredSize, boolean
createSoft) throws MetaMatrixComponentException {
ManagedBatchImpl mb = null;
- synchronized (set) {
- if (set.size() > requiredSize) {
- Iterator<ManagedBatchImpl> iter = set.iterator();
+ boolean createSoft = false;
+ /*
+ * If we are over our limit, collect half of the batches.
+ */
+ synchronized (activeBatches) {
+ if (activeBatchCount > reserveBatches && toPersistCount == 0) {
+ toPersistCount = activeBatchCount / 2;
+ }
+ }
+ while (true) {
+ synchronized (activeBatches) {
+ if (activeBatchCount == 0 || toPersistCount == 0) {
+ toPersistCount = 0;
+ break;
+ }
+ Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
+ TupleBufferInfo tbi = iter.next();
+ Map.Entry<Integer, ManagedBatchImpl> entry = null;
+ if (tbi.lastUsed != null) {
+ entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
+ }
+ if (entry == null) {
+ entry = tbi.batches.pollLastEntry();
+ } else {
+ createSoft = true;
+ tbi.batches.remove(entry.getKey());
+ }
+ if (tbi.batches.isEmpty()) {
+ iter.remove();
+ }
+ activeBatchCount--;
+ toPersistCount--;
+ mb = entry.getValue();
+ }
+ persist(createSoft, mb);
+ }
+ synchronized (softCache) {
+ if (softCache.size() > reserveBatches) {
+ Iterator<ManagedBatchImpl> iter = softCache.iterator();
mb = iter.next();
iter.remove();
}
}
+ persist(false, mb);
+ }
+
+ private void persist(boolean createSoft, ManagedBatchImpl mb)
+ throws MetaMatrixComponentException {
try {
if (mb != null) {
mb.persist(createSoft);
@@ -202,7 +296,12 @@
}
public void remove() {
- activeBatches.remove(this);
+ synchronized (activeBatches) {
+ TupleBufferInfo tbi = activeBatches.get(this.id);
+ if (tbi != null && tbi.removeBatch(this.beginRow) != null &&
tbi.batches.isEmpty()) {
+ activeBatches.remove(this.id);
+ }
+ }
softCache.remove(this);
pBatch = null;
if (batchReference != null) {
@@ -222,11 +321,13 @@
private int maxProcessingBatches = BufferManager.DEFAULT_MAX_PROCESSING_BATCHES;
private int reserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
private int maxReserveBatches = BufferManager.DEFAULT_RESERVE_BUFFERS;
-
+
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
- private Set<ManagedBatchImpl> activeBatches = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatchImpl>());
+ private int toPersistCount = 0;
+ private int activeBatchCount = 0;
+ private Map<String, TupleBufferInfo> activeBatches = new
LinkedHashMap<String, TupleBufferInfo>();
private Set<ManagedBatchImpl> softCache = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatchImpl>());
private StorageManager diskMgr;
@@ -236,6 +337,7 @@
private AtomicInteger readCount = new AtomicInteger();
private AtomicInteger writeCount = new AtomicInteger();
private AtomicInteger readAttempts = new AtomicInteger();
+ private AtomicInteger referenceHit = new AtomicInteger();
public int getMaxProcessingBatches() {
return maxProcessingBatches;