[teiid-commits] teiid SVN: r3497 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 2 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Sep 14 16:18:16 EDT 2011
Author: shawkins
Date: 2011-09-14 16:18:16 -0400 (Wed, 14 Sep 2011)
New Revision: 3497
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
Log:
TEIID-1750 TEIID-1753 converting compaction logic to in place and adding tail tracking
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -176,6 +176,13 @@
public synchronized long getLength() {
return len;
}
+
+ public synchronized void truncate(long length) throws TeiidComponentException {
+ truncateDirect(length);
+ len = length;
+ }
+
+ protected abstract void truncateDirect(long length) throws TeiidComponentException;
public int read(long fileOffset, byte[] b, int offSet, int length)
throws TeiidComponentException {
@@ -199,21 +206,21 @@
} while (n < length);
}
- public void write(byte[] bytes) throws TeiidComponentException {
- write(bytes, 0, bytes.length);
+ public synchronized long write(byte[] bytes, int offset, int length) throws TeiidComponentException {
+ return write(len, bytes, offset, length);
}
-
- public synchronized long write(byte[] bytes, int offset, int length) throws TeiidComponentException {
+
+ public synchronized long write(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
if (removed) {
throw new TeiidComponentException("already removed"); //$NON-NLS-1$
}
- writeDirect(bytes, offset, length);
+ writeDirect(start, bytes, offset, length);
long result = len;
- len += length;
+ len = Math.max(len, start + length);
return result;
}
- protected abstract void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException;
+ protected abstract void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException;
public synchronized void remove() {
if (!this.removed) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -23,25 +23,24 @@
package org.teiid.common.buffer.impl;
import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -113,13 +112,15 @@
}
private final class BatchManagerImpl implements BatchManager {
- private final String id;
- private volatile FileStore store;
- private Map<Long, long[]> physicalMapping = new ConcurrentHashMap<Long, long[]>();
- private ReadWriteLock compactionLock = new ReentrantReadWriteLock();
- private AtomicLong unusedSpace = new AtomicLong();
+ final String id;
+ volatile FileStore store;
+ Map<Long, long[]> physicalMapping = new HashMap<Long, long[]>();
+ long tail;
+ ConcurrentSkipListSet<Long> freed = new ConcurrentSkipListSet<Long>();
+ ReadWriteLock compactionLock = new ReentrantReadWriteLock();
+ AtomicLong unusedSpace = new AtomicLong();
private int[] lobIndexes;
- private SizeUtility sizeUtility;
+ SizeUtility sizeUtility;
private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
private BatchManagerImpl(String newID, int[] lobIndexes) {
@@ -130,6 +131,16 @@
this.sizeUtility = new SizeUtility();
}
+ private void freeBatch(Long batch) {
+ long[] info = physicalMapping.remove(batch);
+ if (info != null) {
+ unusedSpace.addAndGet(info[1]);
+ if (info[0] + info[1] == tail) {
+ tail -= info[1];
+ }
+ }
+ }
+
public FileStore createStorage(String prefix) {
return createFileStore(id+prefix);
}
@@ -143,55 +154,84 @@
return mbi;
}
- private boolean shouldCompact(long offset) {
- return offset > COMPACTION_THRESHOLD && unusedSpace.get() * 4 > offset * 3;
- }
-
private long getOffset() throws TeiidComponentException {
- long offset = store.getLength();
- if (!shouldCompact(offset)) {
- return offset;
+ if (store.getLength() <= compactionThreshold || unusedSpace.get() * 4 <= store.getLength() * 3) {
+ return tail;
}
- try {
- this.compactionLock.writeLock().lock();
- offset = store.getLength();
- //retest the condition to ensure that compaction is still needed
- if (!shouldCompact(offset)) {
- return offset;
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Running full compaction on", id); //$NON-NLS-1$
+ }
+ byte[] buffer = new byte[IO_BUFFER_SIZE];
+ TreeSet<long[]> bySize = new TreeSet<long[]>(new Comparator<long[]>() {
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ int signum = Long.signum(o1[1] - o2[1]);
+ if (signum == 0) {
+ //take the upper address first
+ return Long.signum(o2[0] - o1[0]);
+ }
+ return signum;
}
- FileStore newStore = createFileStore(id);
- newStore.setCleanupReference(this);
- byte[] buffer = new byte[IO_BUFFER_SIZE];
- List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
- Collections.sort(values, new Comparator<long[]>() {
- @Override
- public int compare(long[] o1, long[] o2) {
- return Long.signum(o1[0] - o2[0]);
+ });
+ TreeSet<long[]> byAddress = new TreeSet<long[]>(new Comparator<long[]>() {
+
+ @Override
+ public int compare(long[] o1, long[] o2) {
+ return Long.signum(o1[0] - o2[0]);
+ }
+ });
+ bySize.addAll(physicalMapping.values());
+ byAddress.addAll(physicalMapping.values());
+ long lastEndAddress = 0;
+ unusedSpace.set(0);
+ long minFreeSpace = 1 << 11;
+ while (!byAddress.isEmpty()) {
+ long[] info = byAddress.pollFirst();
+ bySize.remove(info);
+
+ long currentOffset = info[0];
+ long space = currentOffset - lastEndAddress;
+ while (space > 0 && !bySize.isEmpty()) {
+ long[] smallest = bySize.first();
+ if (smallest[1] > space) {
+ break;
}
- });
- for (long[] info : values) {
- long oldOffset = info[0];
- info[0] = newStore.getLength();
- int size = (int)info[1];
- while (size > 0) {
- int toWrite = Math.min(IO_BUFFER_SIZE, size);
- store.readFully(oldOffset, buffer, 0, toWrite);
- newStore.write(buffer, 0, toWrite);
- size -= toWrite;
- }
+ bySize.pollFirst();
+ byAddress.remove(smallest);
+ move(smallest, lastEndAddress, buffer);
+ space -= smallest[1];
+ lastEndAddress += smallest[1];
}
- store.remove();
- store = newStore;
- long oldOffset = offset;
- offset = store.getLength();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+
+ if (space <= minFreeSpace) {
+ unusedSpace.addAndGet(space);
+ } else {
+ move(info, lastEndAddress, buffer);
}
- return offset;
- } finally {
- this.compactionLock.writeLock().unlock();
+ lastEndAddress = info[0] + info[1];
}
+ long oldLength = store.getLength();
+ store.truncate(lastEndAddress);
+ tail = lastEndAddress;
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldLength, "post-size", store.getLength()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ }
+ return tail;
}
+
+ private void move(long[] toMove, long newOffset, byte[] buffer) throws TeiidComponentException {
+ long oldOffset = toMove[0];
+ toMove[0] = newOffset;
+ int size = (int)toMove[1];
+ while (size > 0) {
+ int toWrite = Math.min(IO_BUFFER_SIZE, size);
+ store.readFully(oldOffset, buffer, 0, toWrite);
+ store.write(newOffset, buffer, 0, toWrite);
+ size -= toWrite;
+ oldOffset += toWrite;
+ newOffset += toWrite;
+ }
+ }
@Override
public void remove() {
@@ -350,7 +390,6 @@
try {
batchManager.compactionLock.readLock().lock();
long[] info = batchManager.physicalMapping.get(this.id);
- Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
batch = new TupleBatch();
batch.setRowOffset(ois.readInt());
@@ -378,7 +417,7 @@
}
public synchronized void persist() throws TeiidComponentException {
- BatchManagerImpl batchManager = managerRef.get();
+ final BatchManagerImpl batchManager = managerRef.get();
if (batchManager == null) {
remove();
return;
@@ -392,23 +431,37 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "writing batch to disk, total writes: ", count); //$NON-NLS-1$
}
- long offset = 0;
if (lobManager != null) {
for (List<?> tuple : batch.getTuples()) {
lobManager.updateReferences(batchManager.lobIndexes, tuple);
}
}
- synchronized (batchManager.store) {
- offset = batchManager.getOffset();
- OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
- oos.writeInt(batch.getBeginRow());
- batch.writeExternal(oos);
- oos.close();
- long size = batchManager.store.getLength() - offset;
- long[] info = new long[] {offset, size};
- batchManager.physicalMapping.put(this.id, info);
+ batchManager.compactionLock.writeLock().lock();
+ Long free = null;
+ while ((free = batchManager.freed.pollFirst()) != null) {
+ batchManager.freeBatch(free);
}
+ lockheld = true;
+ final long offset = batchManager.getOffset();
+ ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream(new byte[IO_BUFFER_SIZE]) {
+
+ @Override
+ protected void flushDirect() throws IOException {
+ try {
+ batchManager.store.write(offset + bytesWritten, buf, 0, count);
+ } catch (TeiidComponentException e) {
+ throw new IOException(e);
+ }
+ }
+ };
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ oos.writeInt(batch.getBeginRow());
+ batch.writeExternal(oos);
+ oos.close();
+ long size = fsos.getBytesWritten();
+ long[] info = new long[] {offset, size};
+ batchManager.physicalMapping.put(this.id, info);
+ batchManager.tail = Math.max(batchManager.tail, offset + size);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
}
@@ -436,7 +489,12 @@
}
public void remove() {
- cleanupManagedBatch(managerRef.get(), id);
+ activeBatch = null;
+ batchReference = null;
+ BatchManagerImpl batchManager = managerRef.get();
+ if (batchManager != null) {
+ cleanupManagedBatch(batchManager, id);
+ }
}
@Override
@@ -475,6 +533,7 @@
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
+ private int compactionThreshold = COMPACTION_THRESHOLD;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
@@ -581,9 +640,15 @@
}
}
}
- long[] info = batchManager.physicalMapping.remove(id);
- if (info != null) {
- batchManager.unusedSpace.addAndGet(info[1]);
+
+ if (batchManager.compactionLock.writeLock().tryLock()) {
+ try {
+ batchManager.freeBatch(id);
+ } finally {
+ batchManager.compactionLock.writeLock().unlock();
+ }
+ } else {
+ batchManager.freed.add(id);
}
}
@@ -730,7 +795,7 @@
try {
mb.persist();
} catch (TeiidComponentException e) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
}
}
}
@@ -743,8 +808,6 @@
private int[] getSizeEstimates(List<? extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
- //we make a assumption that the average column size under 64bits is approximately 128bytes
- //this includes alignment, row/array, and reference overhead
for (Expression element : elements) {
Class<?> type = element.getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
@@ -829,4 +892,8 @@
this.inlineLobs = inlineLobs;
}
+ public void setCompactionThreshold(int compactionThreshold) {
+ this.compactionThreshold = compactionThreshold;
+ }
+
}
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class ExtensibleBufferedOutputStream extends OutputStream {
+
+ protected int bytesWritten;
+ protected byte buf[];
+ protected int count;
+
+ public ExtensibleBufferedOutputStream(byte[] buf) {
+ this.buf = buf;
+ }
+
+ public void write(int b) throws IOException {
+ if (count >= buf.length) {
+ flush();
+ }
+ buf[count++] = (byte)b;
+ }
+
+ public void write(byte b[], int off, int len) throws IOException {
+ while (true) {
+ int toCopy = Math.min(buf.length - count, len);
+ System.arraycopy(b, off, buf, count, toCopy);
+ count += toCopy;
+ len -= toCopy;
+ off += toCopy;
+ if (count < buf.length) {
+ break;
+ }
+ flush();
+ }
+ }
+
+ public void flush() throws IOException {
+ if (count > 0) {
+ flushDirect();
+ }
+ bytesWritten += count;
+ count = 0;
+ }
+
+ protected abstract void flushDirect() throws IOException;
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ }
+
+ public int getBytesWritten() {
+ return bytesWritten;
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -100,9 +100,6 @@
this.name = name;
}
- /**
- * Concurrent reads are possible, but only after writing is complete.
- */
public synchronized int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
try {
RandomAccessFile fileAccess = fileInfo.open();
@@ -118,39 +115,45 @@
/**
* Concurrent writes are prevented by FileStore, but in general should not happen since processing is single threaded.
*/
- public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
+ public void writeDirect(long fileOffset, byte[] bytes, int offset, int length) throws TeiidComponentException {
long used = usedBufferSpace.addAndGet(length);
if (used > maxBufferSpace) {
usedBufferSpace.addAndGet(-length);
+ //TODO: trigger a compaction before this is thrown
throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", maxBufferSpace)); //$NON-NLS-1$
}
- long fileOffset = 0;
if (fileInfo == null) {
fileInfo = new FileInfo(createFile(name));
- if (fileInfo != null) {
- fileOffset += fileInfo.file.length();
- }
}
- synchronized (this) {
- try {
- RandomAccessFile fileAccess = fileInfo.open();
- long pointer = fileAccess.length();
- fileAccess.setLength(pointer + length);
- fileAccess.seek(pointer);
- fileAccess.write(bytes, offset, length);
- } catch(IOException e) {
- throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
- } finally {
- fileInfo.close();
- }
- }
+ try {
+ RandomAccessFile fileAccess = fileInfo.open();
+ fileAccess.setLength(fileOffset + length);
+ fileAccess.seek(fileOffset);
+ fileAccess.write(bytes, offset, length);
+ } catch(IOException e) {
+ throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+ } finally {
+ fileInfo.close();
+ }
}
- public synchronized void removeDirect() {
+ public void removeDirect() {
usedBufferSpace.addAndGet(-len);
- fileInfo.delete();
+ if (fileInfo != null){
+ fileInfo.delete();
+ }
}
+ @Override
+ protected void truncateDirect(long length) throws TeiidComponentException {
+ try {
+ RandomAccessFile raf = fileInfo.open();
+ raf.setLength(length);
+ } catch (IOException e) {
+ throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+ }
+ }
+
}
// Initialization
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -45,19 +45,20 @@
private ByteBuffer buffer = ByteBuffer.allocate(1 << 16);
@Override
- public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
- if (getLength() + length > buffer.capacity()) {
+ public void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
+ buffer.position((int)start);
+ if (buffer.position() + length > buffer.capacity()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2 + length);
buffer.position(0);
newBuffer.put(buffer);
buffer = newBuffer;
+ buffer.position((int)start);
}
- buffer.position((int)getLength());
buffer.put(bytes, offset, length);
}
@Override
- public synchronized void removeDirect() {
+ public void removeDirect() {
removed.incrementAndGet();
buffer = ByteBuffer.allocate(0);
}
@@ -74,6 +75,15 @@
buffer.get(b, offset, length);
return length;
}
+
+ @Override
+ protected void truncateDirect(long length) {
+ ByteBuffer newBuffer = ByteBuffer.allocate((int)length);
+ buffer.position(0);
+ buffer.limit(newBuffer.capacity());
+ newBuffer.put(buffer);
+ buffer = newBuffer;
+ }
};
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -22,6 +22,7 @@
package org.teiid.common.buffer.impl;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -57,14 +58,16 @@
this.name = name;
}
+ @Override
public int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
Map.Entry<Long, FileStore> entry = storageFiles.floorEntry(fileOffset);
FileStore fileInfo = entry.getValue();
return fileInfo.read(fileOffset - entry.getKey(), b, offSet, length);
}
- public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
- Map.Entry<Long, FileStore> entry = this.storageFiles.lastEntry();
+ @Override
+ public void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
+ Map.Entry<Long, FileStore> entry = this.storageFiles.floorEntry(start);
boolean createNew = false;
FileStore fileInfo = null;
long fileOffset = 0;
@@ -73,7 +76,10 @@
} else {
fileInfo = entry.getValue();
fileOffset = entry.getKey();
- createNew = entry.getValue().getLength() + length > getMaxFileSize();
+ if (start > entry.getValue().getLength() + fileOffset) {
+ throw new AssertionError("invalid write start location"); //$NON-NLS-1$
+ }
+ createNew = start + length > getMaxFileSize();
}
if (createNew) {
FileStore newFileInfo = storageManager.createFileStore(name + "_" + storageFiles.size()); //$NON-NLS-1$
@@ -83,15 +89,36 @@
storageFiles.put(fileOffset, newFileInfo);
fileInfo = newFileInfo;
}
- fileInfo.write(bytes, offset, length);
+ fileInfo.write(start - fileOffset, bytes, offset, length);
}
public void removeDirect() {
for (FileStore info : storageFiles.values()) {
info.remove();
}
+ storageFiles.clear();
}
+ @Override
+ protected void truncateDirect(long length)
+ throws TeiidComponentException {
+ Map.Entry<Long, FileStore> start = storageFiles.floorEntry(length);
+ if (start == null) {
+ return;
+ }
+ if (start.getKey().longValue() == length) {
+ start.getValue().remove();
+ storageFiles.remove(start.getKey());
+ } else {
+ start.getValue().truncate(length - start.getKey());
+ }
+ for (Iterator<FileStore> iter = storageFiles.tailMap(length, false).values().iterator(); iter.hasNext();) {
+ iter.next().remove();
+ iter.remove();
+ }
+
+ }
+
}
public long getMaxFileSize() {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -100,5 +100,34 @@
}
}
-
+
+ /**
+ * Forces the logic through several compaction cycles by using large strings
+ * @throws TeiidComponentException
+ */
+ @Test public void testCompaction() throws TeiidComponentException {
+ BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+ bm.setProcessorBatchSize(32);
+ bm.setMaxReserveKB(0);//force all to disk
+ bm.setCompactionThreshold(0);
+ bm.initialize();
+
+ ElementSymbol e1 = new ElementSymbol("x");
+ e1.setType(String.class);
+ List<ElementSymbol> elements = Arrays.asList(e1);
+ STree map = bm.createSTree(elements, "1", 1);
+
+ int size = 1000;
+
+ for (int i = 0; i < size; i++) {
+ assertNull(map.insert(Arrays.asList(new String(new byte[1000])), InsertMode.ORDERED, size));
+ assertEquals(i + 1, map.getRowCount());
+ }
+
+ for (int i = 0; i < size; i++) {
+ assertNotNull(map.remove(Arrays.asList(new String(new byte[1000]))));
+ }
+
+ }
+
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java 2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java 2011-09-14 20:18:16 UTC (rev 3497)
@@ -48,5 +48,26 @@
assertEquals(2, msm.getRemoved());
}
+
+ @Test public void testTruncate() throws Exception {
+ MemoryStorageManager msm = new MemoryStorageManager();
+ SplittableStorageManager ssm = new SplittableStorageManager(msm);
+ ssm.setMaxFileSizeDirect(2048);
+ String tsID = "0"; //$NON-NLS-1$
+ // Add one batch
+ FileStore store = ssm.createFileStore(tsID);
+ TestFileStorageManager.writeBytes(store);
+
+ assertEquals(1, msm.getCreated());
+ TestFileStorageManager.writeBytes(store);
+
+ assertEquals(2, msm.getCreated());
+
+ store.truncate(100);
+
+ assertEquals(1, msm.getRemoved());
+
+ }
+
}
More information about the teiid-commits
mailing list