[teiid-commits] teiid SVN: r3497 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Sep 14 16:18:16 EDT 2011


Author: shawkins
Date: 2011-09-14 16:18:16 -0400 (Wed, 14 Sep 2011)
New Revision: 3497

Added:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
   trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
Log:
TEIID-1750 TEIID-1753 converting compaction logic to in place and adding tail tracking

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -176,6 +176,13 @@
 	public synchronized long getLength() {
 		return len;
 	}
+	
+	public synchronized void truncate(long length) throws TeiidComponentException {
+		truncateDirect(length);
+		len = length;
+	}
+	
+	protected abstract void truncateDirect(long length) throws TeiidComponentException;
 		
 	public int read(long fileOffset, byte[] b, int offSet, int length)
 			throws TeiidComponentException {
@@ -199,21 +206,21 @@
     	} while (n < length);
 	}
 	
-	public void write(byte[] bytes) throws TeiidComponentException {
-		write(bytes, 0, bytes.length);
+	public synchronized long write(byte[] bytes, int offset, int length) throws TeiidComponentException {
+		return write(len, bytes, offset, length);
 	}
-
-	public synchronized long write(byte[] bytes, int offset, int length) throws TeiidComponentException {
+	
+	public synchronized long write(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
 		if (removed) {
 			throw new TeiidComponentException("already removed"); //$NON-NLS-1$
 		}
-		writeDirect(bytes, offset, length);
+		writeDirect(start, bytes, offset, length);
 		long result = len;
-		len += length;		
+		len = Math.max(len, start + length);
 		return result;
 	}
 
-	protected abstract void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException;
+	protected abstract void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException;
 
 	public synchronized void remove() {
 		if (!this.removed) {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -23,25 +23,24 @@
 package org.teiid.common.buffer.impl;
 
 import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.OutputStream;
 import java.lang.ref.Reference;
 import java.lang.ref.ReferenceQueue;
 import java.lang.ref.SoftReference;
 import java.lang.ref.WeakReference;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -113,13 +112,15 @@
 	}
 	
 	private final class BatchManagerImpl implements BatchManager {
-		private final String id;
-		private volatile FileStore store;
-		private Map<Long, long[]> physicalMapping = new ConcurrentHashMap<Long, long[]>();
-		private ReadWriteLock compactionLock = new ReentrantReadWriteLock();
-		private AtomicLong unusedSpace = new AtomicLong();
+		final String id;
+		volatile FileStore store;
+		Map<Long, long[]> physicalMapping = new HashMap<Long, long[]>();
+		long tail;
+		ConcurrentSkipListSet<Long> freed = new ConcurrentSkipListSet<Long>(); 
+		ReadWriteLock compactionLock = new ReentrantReadWriteLock();
+		AtomicLong unusedSpace = new AtomicLong();
 		private int[] lobIndexes;
-		private SizeUtility sizeUtility;
+		SizeUtility sizeUtility;
 		private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
 
 		private BatchManagerImpl(String newID, int[] lobIndexes) {
@@ -130,6 +131,16 @@
 			this.sizeUtility = new SizeUtility();
 		}
 		
+		private void freeBatch(Long batch) {
+			long[] info = physicalMapping.remove(batch);
+			if (info != null) { 
+				unusedSpace.addAndGet(info[1]); 
+				if (info[0] + info[1] == tail) {
+					tail -= info[1];
+				}
+			}
+		}
+		
 		public FileStore createStorage(String prefix) {
 			return createFileStore(id+prefix);
 		}
@@ -143,55 +154,84 @@
 			return mbi;
 		}
 		
-		private boolean shouldCompact(long offset) {
-			return offset > COMPACTION_THRESHOLD && unusedSpace.get() * 4 > offset * 3;
-		}
-		
 		private long getOffset() throws TeiidComponentException {
-			long offset = store.getLength();
-			if (!shouldCompact(offset)) {
-				return offset;
+			if (store.getLength() <= compactionThreshold || unusedSpace.get() * 4 <= store.getLength() * 3) {
+				return tail;
 			}
-			try {
-				this.compactionLock.writeLock().lock();
-				offset = store.getLength();
-				//retest the condition to ensure that compaction is still needed
-				if (!shouldCompact(offset)) {
-					return offset;
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_DQP, "Running full compaction on", id); //$NON-NLS-1$
+			}
+			byte[] buffer = new byte[IO_BUFFER_SIZE];
+			TreeSet<long[]> bySize = new TreeSet<long[]>(new Comparator<long[]>() {
+				@Override
+				public int compare(long[] o1, long[] o2) {
+					int signum = Long.signum(o1[1] - o2[1]);
+					if (signum == 0) {
+						//take the upper address first
+						return Long.signum(o2[0] - o1[0]);
+					}
+					return signum;
 				}
-				FileStore newStore = createFileStore(id);
-				newStore.setCleanupReference(this);
-				byte[] buffer = new byte[IO_BUFFER_SIZE];
-				List<long[]> values = new ArrayList<long[]>(physicalMapping.values());
-				Collections.sort(values, new Comparator<long[]>() {
-					@Override
-					public int compare(long[] o1, long[] o2) {
-						return Long.signum(o1[0] - o2[0]);
+			});
+			TreeSet<long[]> byAddress = new TreeSet<long[]>(new Comparator<long[]>() {
+				
+				@Override
+				public int compare(long[] o1, long[] o2) {
+					return Long.signum(o1[0] - o2[0]);
+				}
+			});
+			bySize.addAll(physicalMapping.values());
+			byAddress.addAll(physicalMapping.values());
+			long lastEndAddress = 0;
+			unusedSpace.set(0);
+			long minFreeSpace = 1 << 11;
+			while (!byAddress.isEmpty()) {
+				long[] info = byAddress.pollFirst();
+				bySize.remove(info);
+
+				long currentOffset = info[0];
+				long space = currentOffset - lastEndAddress;
+				while (space > 0 && !bySize.isEmpty()) {
+					long[] smallest = bySize.first();
+					if (smallest[1] > space) {
+						break;
 					}
-				});
-				for (long[] info : values) {
-					long oldOffset = info[0];
-					info[0] = newStore.getLength();
-					int size = (int)info[1];
-					while (size > 0) {
-						int toWrite = Math.min(IO_BUFFER_SIZE, size);
-						store.readFully(oldOffset, buffer, 0, toWrite);
-						newStore.write(buffer, 0, toWrite);
-						size -= toWrite;
-					}
+					bySize.pollFirst();
+					byAddress.remove(smallest);
+					move(smallest, lastEndAddress, buffer);
+					space -= smallest[1];
+					lastEndAddress += smallest[1];
 				}
-				store.remove();
-				store = newStore;
-				long oldOffset = offset;
-				offset = store.getLength();
-				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
-					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldOffset, "post-size", offset); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+				
+				if (space <= minFreeSpace) {
+					unusedSpace.addAndGet(space);
+				} else {
+					move(info, lastEndAddress, buffer);
 				}
-				return offset;
-			} finally {
-				this.compactionLock.writeLock().unlock();
+				lastEndAddress = info[0] + info[1];
 			}
+			long oldLength = store.getLength();
+			store.truncate(lastEndAddress);
+			tail = lastEndAddress;
+			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Compacted store", id, "pre-size", oldLength, "post-size", store.getLength()); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+			}
+			return tail;
 		}
+		
+		private void move(long[] toMove, long newOffset, byte[] buffer) throws TeiidComponentException {
+			long oldOffset = toMove[0];
+			toMove[0] = newOffset;
+			int size = (int)toMove[1];
+			while (size > 0) {
+				int toWrite = Math.min(IO_BUFFER_SIZE, size);
+				store.readFully(oldOffset, buffer, 0, toWrite);
+				store.write(newOffset, buffer, 0, toWrite);
+				size -= toWrite;
+				oldOffset += toWrite;
+				newOffset += toWrite;
+			}
+		}
 
 		@Override
 		public void remove() {
@@ -350,7 +390,6 @@
 				try {
 					batchManager.compactionLock.readLock().lock();
 					long[] info = batchManager.physicalMapping.get(this.id);
-					Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
 					ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
 		            batch = new TupleBatch();
 		            batch.setRowOffset(ois.readInt());
@@ -378,7 +417,7 @@
 		}
 
 		public synchronized void persist() throws TeiidComponentException {
-			BatchManagerImpl batchManager = managerRef.get();
+			final BatchManagerImpl batchManager = managerRef.get();
 			if (batchManager == null) {
 				remove();
 				return;
@@ -392,23 +431,37 @@
 						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
 							LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "writing batch to disk, total writes: ", count); //$NON-NLS-1$
 						}
-						long offset = 0;
 						if (lobManager != null) {
 							for (List<?> tuple : batch.getTuples()) {
 								lobManager.updateReferences(batchManager.lobIndexes, tuple);
 							}
 						}
-						synchronized (batchManager.store) {
-							offset = batchManager.getOffset();
-							OutputStream fsos = new BufferedOutputStream(batchManager.store.createOutputStream(), IO_BUFFER_SIZE);
-				            ObjectOutputStream oos = new ObjectOutputStream(fsos);
-				            oos.writeInt(batch.getBeginRow());
-				            batch.writeExternal(oos);
-				            oos.close();
-				            long size = batchManager.store.getLength() - offset;
-				            long[] info = new long[] {offset, size};
-				            batchManager.physicalMapping.put(this.id, info);
+						batchManager.compactionLock.writeLock().lock();
+						Long free = null;
+						while ((free = batchManager.freed.pollFirst()) != null) {
+							batchManager.freeBatch(free);
 						}
+						lockheld = true;
+						final long offset = batchManager.getOffset();
+						ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream(new byte[IO_BUFFER_SIZE]) {
+							
+							@Override
+							protected void flushDirect() throws IOException {
+								try {
+									batchManager.store.write(offset + bytesWritten, buf, 0, count);
+								} catch (TeiidComponentException e) {
+									throw new IOException(e);
+								}
+							}
+						};
+			            ObjectOutputStream oos = new ObjectOutputStream(fsos);
+			            oos.writeInt(batch.getBeginRow());
+			            batch.writeExternal(oos);
+			            oos.close();
+			            long size = fsos.getBytesWritten();
+			            long[] info = new long[] {offset, size};
+			            batchManager.physicalMapping.put(this.id, info);
+			            batchManager.tail = Math.max(batchManager.tail, offset + size);
 						if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 							LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "batch written starting at:", offset); //$NON-NLS-1$
 						}
@@ -436,7 +489,12 @@
 		}
 
 		public void remove() {
-			cleanupManagedBatch(managerRef.get(), id);
+			activeBatch = null;
+			batchReference = null;
+			BatchManagerImpl batchManager = managerRef.get();
+			if (batchManager != null) {
+				cleanupManagedBatch(batchManager, id);
+			}
 		}
 				
 		@Override
@@ -475,6 +533,7 @@
     private boolean useWeakReferences = true;
     private boolean inlineLobs = true;
     private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
+	private int compactionThreshold = COMPACTION_THRESHOLD;
 
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
@@ -581,9 +640,15 @@
 				}
 			}
 		}
-		long[] info = batchManager.physicalMapping.remove(id);
-		if (info != null) {
-			batchManager.unusedSpace.addAndGet(info[1]); 
+		
+		if (batchManager.compactionLock.writeLock().tryLock()) {
+			try {
+				batchManager.freeBatch(id);
+			} finally {
+				batchManager.compactionLock.writeLock().unlock();
+			}
+		} else {
+			batchManager.freed.add(id);
 		}
     }
     
@@ -730,7 +795,7 @@
 			try {
 				mb.persist();
 			} catch (TeiidComponentException e) {
-				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
+				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read that batch later will result in an exception"); //$NON-NLS-1$
 			}
 		}
 	}
@@ -743,8 +808,6 @@
 	private int[] getSizeEstimates(List<? extends Expression> elements) {
 		int total = 0;
 		boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
-		//we make a assumption that the average column size under 64bits is approximately 128bytes
-		//this includes alignment, row/array, and reference overhead
 		for (Expression element : elements) {
 			Class<?> type = element.getType();
 			total += SizeUtility.getSize(isValueCacheEnabled, type);
@@ -829,4 +892,8 @@
 		this.inlineLobs = inlineLobs;
 	}
 	
+	public void setCompactionThreshold(int compactionThreshold) {
+		this.compactionThreshold = compactionThreshold;
+	}
+	
 }

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class ExtensibleBufferedOutputStream extends OutputStream {
+	
+	protected int bytesWritten;
+    protected byte buf[];
+    protected int count;
+    
+    public ExtensibleBufferedOutputStream(byte[] buf) {
+    	this.buf = buf;
+	}
+    
+    public void write(int b) throws IOException {
+		if (count >= buf.length) {
+		    flush();
+		}
+		buf[count++] = (byte)b;
+    }
+
+    public void write(byte b[], int off, int len) throws IOException {
+    	while (true) {
+    		int toCopy = Math.min(buf.length - count, len);
+			System.arraycopy(b, off, buf, count, toCopy);
+			count += toCopy;
+			len -= toCopy;
+			off += toCopy;
+			if (count < buf.length) {
+				break;
+			}
+			flush();
+    	}
+    }
+
+	public void flush() throws IOException {
+		if (count > 0) {
+			flushDirect();
+		}
+		bytesWritten += count;
+		count = 0;
+	}
+	
+	protected abstract void flushDirect() throws IOException;
+    
+    @Override
+    public void close() throws IOException {
+		flush();
+    }
+    
+    public int getBytesWritten() {
+		return bytesWritten;
+	}
+
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -100,9 +100,6 @@
 			this.name = name;
 		}
 	    
-	    /**
-	     * Concurrent reads are possible, but only after writing is complete.
-	     */
 	    public synchronized int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
 			try {
 				RandomAccessFile fileAccess = fileInfo.open();
@@ -118,39 +115,45 @@
 	    /**
 	     * Concurrent writes are prevented by FileStore, but in general should not happen since processing is single threaded.
 	     */
-		public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
+		public void writeDirect(long fileOffset, byte[] bytes, int offset, int length) throws TeiidComponentException {
 			long used = usedBufferSpace.addAndGet(length);
 			if (used > maxBufferSpace) {
 				usedBufferSpace.addAndGet(-length);
+				//TODO: trigger a compaction before this is thrown
 				throw new TeiidComponentException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", maxBufferSpace)); //$NON-NLS-1$
 			}
-			long fileOffset = 0;
 			if (fileInfo == null) {
 				fileInfo = new FileInfo(createFile(name));
-	            if (fileInfo != null) {
-	            	fileOffset += fileInfo.file.length();
-	            }
 	        }
-			synchronized (this) {
-		        try {
-		        	RandomAccessFile fileAccess = fileInfo.open();
-		            long pointer = fileAccess.length();
-		            fileAccess.setLength(pointer + length);
-		            fileAccess.seek(pointer);
-		            fileAccess.write(bytes, offset, length);
-		        } catch(IOException e) {
-		            throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
-		        } finally {
-		        	fileInfo.close();
-		        }
-			}
+	        try {
+	        	RandomAccessFile fileAccess = fileInfo.open();
+	            fileAccess.setLength(fileOffset + length);
+	            fileAccess.seek(fileOffset);
+	            fileAccess.write(bytes, offset, length);
+	        } catch(IOException e) {
+	            throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+	        } finally {
+	        	fileInfo.close();
+	        }
 		}
 		
-		public synchronized void removeDirect() {
+		public void removeDirect() {
 			usedBufferSpace.addAndGet(-len);
-			fileInfo.delete();
+			if (fileInfo != null){
+				fileInfo.delete();
+			}
 		}
 		
+		@Override
+		protected void truncateDirect(long length) throws TeiidComponentException {
+			try {
+				RandomAccessFile raf = fileInfo.open();
+				raf.setLength(length);
+			} catch (IOException e) {
+				throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
+			}
+		}
+		
 	}
 
     // Initialization

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -45,19 +45,20 @@
 			private ByteBuffer buffer = ByteBuffer.allocate(1 << 16);
 			
 			@Override
-			public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
-				if (getLength() + length > buffer.capacity()) {
+			public void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
+				buffer.position((int)start);
+				if (buffer.position() + length > buffer.capacity()) {
 					ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2 + length);
 					buffer.position(0);
 					newBuffer.put(buffer);
 					buffer = newBuffer;
+					buffer.position((int)start);
 				}
-				buffer.position((int)getLength());
 				buffer.put(bytes, offset, length);
 			}
 			
 			@Override
-			public synchronized void removeDirect() {
+			public void removeDirect() {
 				removed.incrementAndGet();
 				buffer = ByteBuffer.allocate(0);
 			}
@@ -74,6 +75,15 @@
 				buffer.get(b, offset, length);
 				return length;
 			}
+			
+			@Override
+			protected void truncateDirect(long length) {
+				ByteBuffer newBuffer = ByteBuffer.allocate((int)length);
+				buffer.position(0);
+				buffer.limit(newBuffer.capacity());
+				newBuffer.put(buffer);
+				buffer = newBuffer;
+			}
 		};
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -22,6 +22,7 @@
 
 package org.teiid.common.buffer.impl;
 
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 
@@ -57,14 +58,16 @@
 			this.name = name;
 		}
 	    
+	    @Override
 	    public int readDirect(long fileOffset, byte[] b, int offSet, int length) throws TeiidComponentException {
 	    	Map.Entry<Long, FileStore> entry = storageFiles.floorEntry(fileOffset);
 	    	FileStore fileInfo = entry.getValue();
 	    	return fileInfo.read(fileOffset - entry.getKey(), b, offSet, length);
 	    }
 
-		public void writeDirect(byte[] bytes, int offset, int length) throws TeiidComponentException {
-			Map.Entry<Long, FileStore> entry = this.storageFiles.lastEntry();
+	    @Override
+		public void writeDirect(long start, byte[] bytes, int offset, int length) throws TeiidComponentException {
+			Map.Entry<Long, FileStore> entry = this.storageFiles.floorEntry(start);
 			boolean createNew = false;
 			FileStore fileInfo = null;
 			long fileOffset = 0;
@@ -73,7 +76,10 @@
 			} else {
 				fileInfo = entry.getValue();
 				fileOffset = entry.getKey();
-				createNew = entry.getValue().getLength() + length > getMaxFileSize();
+				if (start > entry.getValue().getLength() + fileOffset) {
+					throw new AssertionError("invalid write start location"); //$NON-NLS-1$
+				}
+				createNew = start + length > getMaxFileSize();
 			}
 			if (createNew) {
 				FileStore newFileInfo = storageManager.createFileStore(name + "_" + storageFiles.size()); //$NON-NLS-1$
@@ -83,15 +89,36 @@
 	            storageFiles.put(fileOffset, newFileInfo);
 	            fileInfo = newFileInfo;
 	        }
-			fileInfo.write(bytes, offset, length);
+			fileInfo.write(start - fileOffset, bytes, offset, length);
 		}
 		
 		public void removeDirect() {
 			for (FileStore info : storageFiles.values()) {
 				info.remove();
 			}
+			storageFiles.clear();
 		}
 		
+		@Override
+		protected void truncateDirect(long length)
+				throws TeiidComponentException {
+			Map.Entry<Long, FileStore> start = storageFiles.floorEntry(length);
+			if (start == null) {
+				return;
+			}
+			if (start.getKey().longValue() == length) {
+				start.getValue().remove();
+				storageFiles.remove(start.getKey());
+			} else {
+				start.getValue().truncate(length - start.getKey());
+			}
+			for (Iterator<FileStore> iter = storageFiles.tailMap(length, false).values().iterator(); iter.hasNext();) {
+				iter.next().remove();
+				iter.remove();
+			}
+			
+		}
+		
 	}
 	
     public long getMaxFileSize() {

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestSTree.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -100,5 +100,34 @@
 		}
 				
 	}
-
+	
+	/**
+	 * Forces the logic through several compaction cycles by using large strings
+	 * @throws TeiidComponentException
+	 */
+	@Test public void testCompaction() throws TeiidComponentException {
+		BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+		bm.setProcessorBatchSize(32);
+		bm.setMaxReserveKB(0);//force all to disk
+		bm.setCompactionThreshold(0);
+		bm.initialize();
+		
+		ElementSymbol e1 = new ElementSymbol("x");
+		e1.setType(String.class);
+		List<ElementSymbol> elements = Arrays.asList(e1);
+		STree map = bm.createSTree(elements, "1", 1);
+		
+		int size = 1000;
+		
+		for (int i = 0; i < size; i++) {
+			assertNull(map.insert(Arrays.asList(new String(new byte[1000])), InsertMode.ORDERED, size));
+			assertEquals(i + 1, map.getRowCount());
+		}
+		
+		for (int i = 0; i < size; i++) {
+			assertNotNull(map.remove(Arrays.asList(new String(new byte[1000]))));
+		}
+				
+	}
+	
 }

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java	2011-09-14 19:19:11 UTC (rev 3496)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSplittableStorageManager.java	2011-09-14 20:18:16 UTC (rev 3497)
@@ -48,5 +48,26 @@
         
         assertEquals(2, msm.getRemoved());
     }
+    
+    @Test public void testTruncate() throws Exception {
+    	MemoryStorageManager msm = new MemoryStorageManager();
+        SplittableStorageManager ssm = new SplittableStorageManager(msm);
+        ssm.setMaxFileSizeDirect(2048);
+        String tsID = "0";     //$NON-NLS-1$
+        // Add one batch
+        FileStore store = ssm.createFileStore(tsID);
+        TestFileStorageManager.writeBytes(store);
+        
+        assertEquals(1, msm.getCreated());
 
+        TestFileStorageManager.writeBytes(store);
+        
+        assertEquals(2, msm.getCreated());
+        
+        store.truncate(100);
+        
+        assertEquals(1, msm.getRemoved());
+        
+    }
+
 }



More information about the teiid-commits mailing list