[teiid-commits] teiid SVN: r3494 - in branches/7.4.x/engine/src: test/java/org/teiid/common/buffer and 1 other directory.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Sep 14 13:57:13 EDT 2011
Author: shawkins
Date: 2011-09-14 13:57:13 -0400 (Wed, 14 Sep 2011)
New Revision: 3494
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
Log:
TEIID-1753 addressing compaction issues
Modified: branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-13 23:22:55 UTC (rev 3493)
+++ branches/7.4.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-14 17:57:13 UTC (rev 3494)
@@ -36,12 +36,14 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -95,7 +97,8 @@
private final class BatchManagerImpl implements BatchManager {
private final String id;
private volatile FileStore store;
- private Map<Long, long[]> physicalMapping = new ConcurrentHashMap<Long, long[]>();
+ private Map<Long, long[]> physicalMapping = new HashMap<Long, long[]>();
+ private ConcurrentSkipListSet<Long> freed = new ConcurrentSkipListSet<Long>();
private ReadWriteLock compactionLock = new ReentrantReadWriteLock();
private AtomicLong unusedSpace = new AtomicLong();
private int[] lobIndexes;
@@ -109,6 +112,13 @@
this.sizeUtility = new SizeUtility();
}
+ private void freeBatch(Long id) {
+ long[] info = physicalMapping.remove(id);
+ if (info != null) {
+ unusedSpace.addAndGet(info[1]);
+ }
+ }
+
public FileStore createStorage(String prefix) {
return createFileStore(id+prefix);
}
@@ -123,7 +133,7 @@
}
private boolean shouldCompact(long offset) {
- return offset > COMPACTION_THRESHOLD && unusedSpace.get() * 4 > offset * 3;
+ return offset > compactionThreshold && unusedSpace.get() * 4 > offset * 3;
}
private long getOffset() throws TeiidComponentException {
@@ -131,45 +141,38 @@
if (!shouldCompact(offset)) {
return offset;
}
- try {
- this.compactionLock.writeLock().lock();
- offset = store.getLength();
- //retest the condition to ensure that compaction is still needed
- if (!shouldCompact(offset)) {
- return offset;
+ offset = store.getLength();
+ FileStore newStore = createFileStore(id);
+ newStore.setCleanupReference(this);
+ byte[] buffer = new byte[IO_BUFFER_SIZE];
+ List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
+ Collections.sort(values, new Comparator<long[]>() {
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ return Long.signum(o1[0] - o2[0]);
}
- FileStore newStore = createFileStore(id);
- newStore.setCleanupReference(this);
- byte[] buffer = new byte[IO_BUFFER_SIZE];
- List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
- Collections.sort(values, new Comparator<long[]>() {
- @Override
- public int compare(long[] o1, long[] o2) {
- return Long.signum(o1[0] - o2[0]);
- }
- });
- for (long[] info : values) {
- long oldOffset = info[0];
- info[0] = newStore.getLength();
- int size = (int)info[1];
- while (size > 0) {
- int toWrite = Math.min(IO_BUFFER_SIZE, size);
- store.readFully(oldOffset, buffer, 0, toWrite);
- newStore.write(buffer, 0, toWrite);
- size -= toWrite;
- }
+ });
+ for (long[] info : values) {
+ long oldOffset = info[0];
+ info[0] = newStore.getLength();
+ int size = (int)info[1];
+ while (size > 0) {
+ int toWrite = Math.min(IO_BUFFER_SIZE, size);
+ store.readFully(oldOffset, buffer, 0, toWrite);
+ newStore.write(buffer, 0, toWrite);
+ size -= toWrite;
+ oldOffset += toWrite;
}
- store.remove();
- store = newStore;
- long oldOffset = offset;
- offset = store.getLength();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- }
- return offset;
- } finally {
- this.compactionLock.writeLock().unlock();
}
+ store.remove();
+ store = newStore;
+ long oldOffset = offset;
+ offset = store.getLength();
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ }
+ unusedSpace.set(0);
+ return offset;
}
@Override
@@ -298,7 +301,6 @@
try {
this.batchManager.compactionLock.readLock().lock();
long[] info = batchManager.physicalMapping.get(this.id);
- Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
batch = new TupleBatch();
batch.setDataTypes(types);
@@ -341,16 +343,20 @@
lobManager.updateReferences(batchManager.lobIndexes, tuple);
}
}
- synchronized (batchManager.store) {
- offset = batchManager.getOffset();
- OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
- batch.writeExternal(oos);
- oos.close();
- long size = batchManager.store.getLength() - offset;
- long[] info = new long[] {offset, size};
- batchManager.physicalMapping.put(this.id, info);
+ batchManager.compactionLock.writeLock().lock();
+ lockheld = true;
+ Long free = null;
+ while ((free = batchManager.freed.pollFirst()) != null) {
+ batchManager.freeBatch(free);
}
+ offset = batchManager.getOffset();
+ OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ batch.writeExternal(oos);
+ oos.close();
+ long size = batchManager.store.getLength() - offset;
+ long[] info = new long[] {offset, size};
+ batchManager.physicalMapping.put(this.id, info);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
}
@@ -383,12 +389,17 @@
}
}
}
- long[] info = batchManager.physicalMapping.remove(id);
- if (info != null) {
- batchManager.unusedSpace.addAndGet(info[1]);
- }
activeBatch = null;
batchReference = null;
+ if (batchManager.compactionLock.writeLock().tryLock()) {
+ try {
+ batchManager.freeBatch(id);
+ } finally {
+ batchManager.compactionLock.writeLock().unlock();
+ }
+ } else {
+ batchManager.freed.add(id);
+ }
}
@Override
@@ -407,6 +418,7 @@
private volatile int reserveBatchKB;
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
private boolean useWeakReferences = true;
+ private int compactionThreshold = COMPACTION_THRESHOLD;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
@@ -642,7 +654,7 @@
try {
mb.persist();
} catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
}
}
}
@@ -651,8 +663,6 @@
public int getSchemaSize(List<? extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
- //we make a assumption that the average column size under 64bits is approximately 128bytes
- //this includes alignment, row/array, and reference overhead
for (Expression element : elements) {
Class<?> type = element.getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
@@ -703,4 +713,8 @@
this.useWeakReferences = useWeakReferences;
}
+ public void setCompactionThreshold(int compactionThreshold) {
+ this.compactionThreshold = compactionThreshold;
+ }
+
}
Modified: branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-13 23:22:55 UTC (rev 3493)
+++ branches/7.4.x/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-14 17:57:13 UTC (rev 3494)
@@ -100,5 +100,34 @@
}
}
+
+ /**
+ * Forces the logic through several compaction cycles by using large strings
+ * @throws TeiidComponentException
+ */
+ @Test public void testCompaction() throws TeiidComponentException {
+ BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+ bm.setProcessorBatchSize(32);
+ bm.setMaxReserveKB(0);//force all to disk
+ bm.setCompactionThreshold(0);
+ bm.initialize();
+
+ ElementSymbol e1 = new ElementSymbol("x");
+ e1.setType(String.class);
+ List<ElementSymbol> elements = Arrays.asList(e1);
+ STree map = bm.createSTree(elements, "1", 1);
+
+ int size = 1000;
+
+ for (int i = 0; i < size; i++) {
+ assertNull(map.insert(Arrays.asList(new String(new byte[1000])), InsertMode.ORDERED, size));
+ assertEquals(i + 1, map.getRowCount());
+ }
+
+ for (int i = 0; i < size; i++) {
+ assertNotNull(map.remove(Arrays.asList(new String(new byte[1000]))));
+ }
+
+ }
}
More information about the teiid-commits
mailing list