Author: shawkins
Date: 2011-10-12 19:23:02 -0400 (Wed, 12 Oct 2011)
New Revision: 3549
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
Log:
TEIID-1750 reintroducing block storage with a fixed file allocation scheme
Added: trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+public class BaseCacheEntry implements Comparable<BaseCacheEntry> {
+
+ private Long id;
+ protected long lastAccess;
+ protected double orderingValue;
+
+ public BaseCacheEntry(Long id) {
+ this.id = id;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return getId().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getId().toString();
+ }
+
+ public long getLastAccess() {
+ return lastAccess;
+ }
+
+ public void setLastAccess(long lastAccess) {
+ this.lastAccess = lastAccess;
+ }
+
+ public double getOrderingValue() {
+ return orderingValue;
+ }
+
+ public void setOrderingValue(double orderingValue) {
+ this.orderingValue = orderingValue;
+ }
+
+ @Override
+ public int compareTo(BaseCacheEntry o) {
+ int result = (int) Math.signum(orderingValue - o.orderingValue);
+ if (result == 0) {
+ result = Long.signum(lastAccess - o.lastAccess);
+ if (result == 0) {
+ return Long.signum(id - o.id);
+ }
+ }
+ return result;
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-10-12 20:56:57
UTC (rev 3548)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-10-12 23:23:02
UTC (rev 3549)
@@ -24,28 +24,16 @@
import java.lang.ref.WeakReference;
-public class CacheEntry implements Comparable<CacheEntry>{
+public class CacheEntry extends BaseCacheEntry {
private boolean persistent;
private Object object;
private int sizeEstimate;
- private long lastAccess;
- private double orderingValue;
private WeakReference<? extends Serializer<?>> serializer;
- private Long id;
public CacheEntry(Long id) {
- this.id = id;
+ super(id);
}
- public Long getId() {
- return id;
- }
-
- @Override
- public int hashCode() {
- return getId().hashCode();
- }
-
public int getSizeEstimate() {
return sizeEstimate;
}
@@ -53,35 +41,7 @@
public void setSizeEstimate(int sizeEstimate) {
this.sizeEstimate = sizeEstimate;
}
-
- public long getLastAccess() {
- return lastAccess;
- }
-
- public void setLastAccess(long lastAccess) {
- this.lastAccess = lastAccess;
- }
-
- public double getOrderingValue() {
- return orderingValue;
- }
-
- public void setOrderingValue(double orderingValue) {
- this.orderingValue = orderingValue;
- }
-
- @Override
- public int compareTo(CacheEntry o) {
- int result = (int) Math.signum(orderingValue - o.orderingValue);
- if (result == 0) {
- result = Long.signum(lastAccess - o.lastAccess);
- if (result == 0) {
- return Long.signum(id - o.id);
- }
- }
- return result;
- }
-
+
public boolean equals(Object obj) {
if (obj == this) {
return true;
@@ -92,10 +52,6 @@
return getId().equals(((CacheEntry)obj).getId());
}
- @Override
- public String toString() {
- return getId().toString();
- }
public Object nullOut() {
Object result = getObject();
@@ -124,8 +80,12 @@
this.serializer = serializer;
}
- public WeakReference<? extends Serializer<?>> getSerializer() {
- return serializer;
+ public Serializer<?> getSerializer() {
+ WeakReference<? extends Serializer<?>> ref = this.serializer;
+ if (ref == null) {
+ return null;
+ }
+ return ref.get();
}
}
\ No newline at end of file
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,137 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Provides buffer slices or blocks off of a central
+ * set of buffers.
+ */
+public class BlockByteBuffer {
+
+ private final static class ThreadLocalByteBuffer extends ThreadLocal<ByteBuffer>
{
+ private final ByteBuffer byteBuffer;
+
+ public ThreadLocalByteBuffer(ByteBuffer byteBuffer) {
+ this.byteBuffer = byteBuffer;
+ }
+
+ protected ByteBuffer initialValue() {
+ return byteBuffer.duplicate();
+ }
+ }
+
+ private static class BlockInfo {
+ final ByteBuffer bb;
+ final int block;
+ public BlockInfo(ByteBuffer bb, int block) {
+ this.bb = bb;
+ this.block = block;
+ }
+ }
+
+ private int blockAddressBits;
+ private int segmentAddressBits;
+ private int segmentSize;
+ private int blockSize;
+ private int blockCount;
+ private ThreadLocal<ByteBuffer>[] buffers;
+ private BlockInfo[] bufferCache;
+
+ /**
+ * Creates a new {@link BlockByteBuffer} where each buffer segment will be
+ * 1 << segmentAddressBits (max of 30), and a total size of (1 <<
blockAddressBits)*blockCount.
+ * @param segmentAddressBits
+ * @param blockCount
+ * @param blockAddressBits
+ * @param direct
+ */
+ @SuppressWarnings("unchecked")
+ public BlockByteBuffer(int segmentAddressBits, int blockCount, int blockAddressBits,
boolean direct) {
+ this.segmentAddressBits = segmentAddressBits;
+ this.blockAddressBits = blockAddressBits;
+ this.blockSize = 1 << blockAddressBits;
+ this.segmentSize = 1 << this.segmentAddressBits;
+ this.blockCount = blockCount;
+ long size = ((long)blockCount)<<blockAddressBits;
+ int fullSegments = (int)size>>segmentAddressBits;
+ int lastSegmentSize = (int) (size&(segmentSize-1));
+ int segments = fullSegments;
+ if (lastSegmentSize > 0) {
+ segments++;
+ }
+ buffers = new ThreadLocal[segments];
+ for (int i = 0; i < fullSegments; i++) {
+ buffers[i] = new ThreadLocalByteBuffer(allocate(lastSegmentSize, direct));
+ }
+ if (lastSegmentSize > 0) {
+ buffers[fullSegments] = new ThreadLocalByteBuffer(allocate(lastSegmentSize, direct));
+ }
+ int logSize = 32 - Integer.numberOfLeadingZeros(blockCount);
+ bufferCache = new BlockInfo[Math.min(logSize, 20)];
+ }
+
+ public static ByteBuffer allocate(int size, boolean direct) {
+ if (direct) {
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(size);
+ int longsPerSegment = size>>3;
+ //manually initialize until java 7 when it's mandated (although this may already
have been performed)
+ for (int j = 0; j < longsPerSegment; j++) {
+ newBuffer.putLong(0);
+ }
+ return newBuffer;
+ }
+ return ByteBuffer.allocate(size);
+ }
+
+ /**
+ * Return a buffer containing the given start byte.
+ * It is assumed that the caller will handle blocks in
+ * a thread safe manner.
+ * @param startIndex
+ * @return
+ */
+ public ByteBuffer getByteBuffer(int block) {
+ if (block < 0 || block >= blockCount) {
+ throw new IndexOutOfBoundsException("Invalid block " + block);
//$NON-NLS-1$
+ }
+ int cacheIndex = block&(bufferCache.length -1);
+ BlockInfo info = bufferCache[cacheIndex];
+ if (info != null && info.block == block) {
+ info.bb.rewind();
+ return info.bb;
+ }
+ int segment = block>>(segmentAddressBits-blockAddressBits);
+ ByteBuffer bb = buffers[segment].get();
+ bb.limit(bb.capacity());
+ int position = (block<<blockAddressBits)&(segmentSize-1);
+ bb.position(position);
+ bb.limit(position + blockSize);
+ bb = bb.slice();
+ info = new BlockInfo(bb, block);
+ bufferCache[cacheIndex] = info;
+ return bb;
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+final class BlockInputStream extends InputStream {
+ private final BlockManager manager;
+ private final int maxBlock;
+ int blockIndex;
+ ByteBuffer buf;
+ boolean free;
+ boolean done;
+
+ BlockInputStream(BlockManager manager, int blockCount, boolean free) {
+ this.manager = manager;
+ this.free = free;
+ this.maxBlock = blockCount;
+ }
+
+ @Override
+ public int read() {
+ ensureBytes();
+ if (done) {
+ return -1;
+ }
+ return buf.get() & 0xff;
+ }
+
+ private void ensureBytes() {
+ if (buf == null || buf.remaining() == 0) {
+ if (maxBlock == blockIndex) {
+ done = true;
+ if (blockIndex > 1 && free) {
+ manager.freeBlock(blockIndex - 1, false);
+ }
+ return;
+ }
+ buf = manager.getBlock(blockIndex++);
+ if (blockIndex > 2 && free) {
+ manager.freeBlock(blockIndex - 2, false);
+ }
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ ensureBytes();
+ if (done) {
+ return -1;
+ }
+ len = Math.min(len, buf.remaining());
+ buf.get(b, off, len);
+ return len;
+ }
+
+ public int free(boolean steal) {
+ return manager.free(steal);
+ }
+}
\ No newline at end of file
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * Represents an INode
+ */
+public interface BlockManager {
+
+ int getInode();
+
+ ByteBuffer allocateBlock(int index);
+
+ /**
+ * Get the block for a given index. Returns null if the block does not exist.
+ * @param index
+ * @return
+ */
+ ByteBuffer getBlock(int index);
+
+ int freeBlock(int index, boolean steal);
+
+ int free(boolean steal);
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+final class BlockOutputStream extends
+ ExtensibleBufferedOutputStream {
+ private final BlockManager blockManager;
+ int blockNum = -1;
+
+ BlockOutputStream(BlockManager blockManager) {
+ this.blockManager = blockManager;
+ }
+
+ @Override
+ protected ByteBuffer newBuffer() {
+ return blockManager.allocateBlock(++blockNum);
+ }
+
+ @Override
+ protected int flushDirect(int i) throws IOException {
+ return i;
+ }
+}
\ No newline at end of file
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.StorageManager;
+
+class BlockStore {
+ final long blockSize;
+ final ConcurrentBitSet blocksInUse;
+ final FileStore[] stores;
+
+ public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog) {
+ this.blockSize = blockSize;
+ int blockCount = 1 << blockCountLog;
+ this.blocksInUse = new ConcurrentBitSet(blockCount,
BufferManagerImpl.CONCURRENCY_LEVEL/2);
+ this.stores = new FileStore[BufferManagerImpl.CONCURRENCY_LEVEL/2];
+ for (int i = 0; i < stores.length; i++) {
+ this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) +
'_' + i);
+ }
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,714 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.teiid.common.buffer.AutoCleanupUtil;
+import org.teiid.common.buffer.BaseCacheEntry;
+import org.teiid.common.buffer.Cache;
+import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.Serializer;
+import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectConverterUtil;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
+import org.teiid.query.QueryPlugin;
+
+/**
+ * Implements storage against a {@link FileStore} abstraction using a fronting
+ * memory buffer with a filesystem paradigm. All objects must go through the
+ * memory (typically off-heap) buffer so that they can be put into their appropriately
+ * sized storage bucket.
+ *
+ * The memory uses a 31bit address space on top of 2^13 byte blocks.
+ *
+ * Therefore there is 2^31*2^13 = 2^44 or 16 terabytes max of addressable space.
+ * This is well beyond any current needs.
+ *
+ * The 64 byte inode format is:
+ * 14 32 bit direct block pointers
+ * 1 32 bit block indirect pointer
+ * 1 32 bit block doubly indirect pointer (should be rarely used)
+ *
+ * This means that the maximum number of blocks available to an object is
+ * 14 + (2^13)/4 + ((2^13)/4)^2 ~= 2^22
+ *
+ * Thus the max serialized object size is: 2^22*(2^13) ~= 32GB.
+ *
+ * Typically the max object size will be much smaller, such as 8MB.
+ *
+ * Inodes are held separately from the data/index blocks, and introduce an overhead
+ * that is ~ 1/128th the size of memory buffer.
+ *
+ * The filesystem stores are broken up into block specific sizes starting with 8KB.
+ *
+ * The root directory "physicalMapping" is held in memory for performance. It
will grow in
+ * proportion to the number of tables/tuplebuffers in use.
+ *
+ * TODO: compact tail storage blocks. there may be dangling blocks causing us to consume
disk space.
+ */
+public class BufferFrontedFileStoreCache implements Cache, StorageManager {
+
+ static final int ADDRESS_BITS = 31;
+ static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
+ static final int BYTES_PER_BLOCK_ADDRESS = 4;
+ static final int INODE_BYTES = 16*BYTES_PER_BLOCK_ADDRESS;
+ static final int LOG_INODE_SIZE = 6;
+ static final int DIRECT_POINTERS = 14;
+ static final int EMPTY_ADDRESS = -1;
+
+ //TODO allow the block size to be configurable
+ static final int LOG_BLOCK_SIZE = 13;
+ static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
+ static final int BLOCK_MASK = BLOCK_SIZE - 1;
+ static final int ADDRESSES_PER_BLOCK = BLOCK_SIZE/BYTES_PER_BLOCK_ADDRESS;
+ static final int MAX_INDIRECT = DIRECT_POINTERS + ADDRESSES_PER_BLOCK;
+ static final int MAX_DOUBLE_INDIRECT = MAX_INDIRECT + ADDRESSES_PER_BLOCK *
ADDRESSES_PER_BLOCK;
+
+ private enum Mode {
+ GET,
+ UPDATE,
+ ALLOCATE
+ }
+
+ private final class InodeBlockManager implements BlockManager {
+ private int inode;
+ private ByteBuffer inodeBuffer;
+ private final long gid;
+ private final long oid;
+
+ InodeBlockManager(long gid, long oid, int inode) {
+ this.inode = inode;
+ this.gid = gid;
+ this.oid = oid;
+ }
+
+ @Override
+ public int getInode() {
+ return inode;
+ }
+
+ @Override
+ public ByteBuffer getBlock(int index) {
+ int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.GET);
+ return blockByteBuffer.getByteBuffer(dataBlock);
+ }
+
+ private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
+ if (index >= MAX_DOUBLE_INDIRECT || (mode == Mode.ALLOCATE && index >=
maxMemoryBlocks)) {
+ throw new TeiidRuntimeException("Max block number exceeded");
//$NON-NLS-1$
+ }
+ int dataBlock = 0;
+ int position = 0;
+ ByteBuffer info = getInodeBlock();
+ if (index >= MAX_INDIRECT) {
+ position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
+ ByteBuffer next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT, value,
mode);
+ if (next != info) {
+ info = next;
+ //should have traversed to the secondary
+ int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
+ position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_SIZE) {
+ info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ }
+ next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT +
indirectAddressBlock * ADDRESSES_PER_BLOCK, value, mode);
+ if (next != info) {
+ info = next;
+ position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
+ }
+ }
+ } else if (index >= DIRECT_POINTERS) {
+ //indirect
+ position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
+ ByteBuffer next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS,
value, mode);
+ if (next != info) {
+ info = next;
+ position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
+ }
+ } else {
+ position = BYTES_PER_BLOCK_ADDRESS*index;
+ }
+ if (mode == Mode.ALLOCATE) {
+ dataBlock = nextBlock(true);
+ info.putInt(position, dataBlock);
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_SIZE) {
+ //maintain the invariant that the next pointer is empty
+ info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ }
+ } else {
+ dataBlock = info.getInt(position);
+ if (mode == Mode.UPDATE) {
+ info.putInt(position, value);
+ }
+ }
+ return dataBlock;
+ }
+
+ private ByteBuffer updateIndirectBlockInfo(ByteBuffer buf, int index, int position, int
cutOff, int value, Mode mode) {
+ int sib_index = buf.getInt(position);
+ if (index == cutOff) {
+ if (mode == Mode.ALLOCATE) {
+ sib_index = nextBlock(false);
+ buf.putInt(position, sib_index);
+ } else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
+ freeDataBlock(sib_index);
+ return buf;
+ }
+ }
+ return blockByteBuffer.getByteBuffer(sib_index);
+ }
+
+ /**
+ * Get the next dataBlock. When the memory buffer is full we have some
+ * book keeping to do.
+ * @return
+ */
+ private int nextBlock(boolean data) {
+ int next = EMPTY_ADDRESS;
+ memoryEvictionLock.readLock().lock();
+ boolean readLocked = true;
+ try {
+ if ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS) {
+ memoryEvictionLock.readLock().unlock();
+ readLocked = false;
+ next = evictFromMemoryBuffer();
+ }
+ } finally {
+ if (readLocked) {
+ memoryEvictionLock.readLock().unlock();
+ }
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "Allocating",
data?"data":"index", "block", next, "to", gid,
oid); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
+ }
+ return next;
+ }
+
+ @Override
+ public int freeBlock(int index, boolean steal) {
+ int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
+ if (!steal) {
+ freeDataBlock(dataBlock);
+ }
+ return dataBlock;
+ }
+
+ private void freeDataBlock(int dataBlock) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock,
"for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ blocksInuse.clear(dataBlock);
+ }
+
+ private ByteBuffer getInodeBlock() {
+ if (inodeBuffer == null) {
+ if (inode == EMPTY_ADDRESS) {
+ this.inode = inodesInuse.getAndSetNextClearBit();
+ if (this.inode == -1) {
+ throw new AssertionError("Out of inodes"); //$NON-NLS-1$
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode",
this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ ByteBuffer bb = getInodeBlock();
+ bb.putInt(EMPTY_ADDRESS);
+ }
+ inodeBuffer = inodeByteBuffer.getByteBuffer(inode);
+ }
+ return inodeBuffer;
+ }
+
+ @Override
+ public int free(boolean steal) {
+ if (this.inode == EMPTY_ADDRESS) {
+ return EMPTY_ADDRESS;
+ }
+ ByteBuffer bb = getInodeBlock();
+ int dataBlockToSteal = bb.getInt(0);
+ int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
+ int doublyIndirectIndexBlock =
bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
+ boolean freedAll = freeBlock(steal?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS,
true);
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode,
"for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ inodesInuse.clear(inode);
+ if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
+ return steal?dataBlockToSteal:EMPTY_ADDRESS;
+ }
+ freedAll = freeIndirectBlock(indirectIndexBlock);
+ if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
+ return steal?dataBlockToSteal:EMPTY_ADDRESS;
+ }
+ bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock);
+ freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
+ freeDataBlock(doublyIndirectIndexBlock);
+ return steal?dataBlockToSteal:EMPTY_ADDRESS;
+ }
+
+ private boolean freeIndirectBlock(int indirectIndexBlock) {
+ ByteBuffer bb = blockByteBuffer.getByteBuffer(indirectIndexBlock);
+ boolean freedAll = freeBlock(0, bb, ADDRESSES_PER_BLOCK, true);
+ freeDataBlock(indirectIndexBlock);
+ return freedAll;
+ }
+
+ private boolean freeBlock(int startPosition, ByteBuffer ib, int numPointers, boolean
primary) {
+ ib.position(startPosition);
+ for (int i = 0; i < numPointers; i++) {
+ int dataBlock = ib.getInt();
+ if (dataBlock == EMPTY_ADDRESS) {
+ return false;
+ }
+ if (primary) {
+ freeDataBlock(dataBlock);
+ } else {
+ freeIndirectBlock(dataBlock);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public ByteBuffer allocateBlock(int blockNum) {
+ int dataBlock = getOrUpdateDataBlockIndex(blockNum, EMPTY_ADDRESS, Mode.ALLOCATE);
+ return blockByteBuffer.getByteBuffer(dataBlock);
+ }
+ }
+
+ private static class PhysicalInfo extends BaseCacheEntry {
+ int inode = EMPTY_ADDRESS;
+ int block = EMPTY_ADDRESS;
+ int sizeIndex = 0;
+ final int memoryBlockCount;
+ final Long gid;
+
+ public PhysicalInfo(Long gid, Long id, int inode, int size) {
+ super(id);
+ this.inode = inode;
+ this.gid = gid;
+ this.memoryBlockCount = (size>>LOG_BLOCK_SIZE) +
((size&BLOCK_MASK)>0?1:0);
+ int blocks = memoryBlockCount;
+ while (blocks >= 1) {
+ this.sizeIndex++;
+ blocks>>=2;
+ }
+ }
+ }
+
+ double crfLamda = .0001;
+
+ StorageManager storageManager;
+ int maxStorageObjectSize = 1 << 23; //8MB
+ private long memoryBufferSpace = 1 << 27;
+ private boolean direct;
+
+ int maxMemoryBlocks;
+ private AtomicLong readAttempts = new AtomicLong();
+ PartiallyOrderedCache<Long, PhysicalInfo> memoryBufferEntries = new
PartiallyOrderedCache<Long, PhysicalInfo>(16, .75f,
BufferManagerImpl.CONCURRENCY_LEVEL) {
+
+ @Override
+ protected void recordAccess(Long key, PhysicalInfo value, boolean initial) {
+ long lastAccess = value.getLastAccess();
+ value.setLastAccess(readAttempts.get());
+ if (initial && lastAccess == 0) {
+ return;
+ }
+ double orderingValue = value.getOrderingValue();
+ orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess,
orderingValue);
+ value.setOrderingValue(orderingValue);
+ }
+
+ };
+ private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
+ ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock();
+
+ private int blocks;
+ private ConcurrentBitSet blocksInuse;
+ private BlockByteBuffer blockByteBuffer;
+
+ private ConcurrentBitSet inodesInuse;
+ private BlockByteBuffer inodeByteBuffer;
+
+ //root directory
+ private ConcurrentHashMap<Long, Map<Long, PhysicalInfo>> physicalMapping =
new ConcurrentHashMap<Long, Map<Long, PhysicalInfo>>(16, .75f,
BufferManagerImpl.CONCURRENCY_LEVEL);
+ private BlockStore[] sizeBasedStores;
+
+ @Override
+ public void initialize() throws TeiidComponentException {
+ storageManager.initialize();
+ blocks = (int) Math.min(Integer.MAX_VALUE, (memoryBufferSpace>>LOG_BLOCK_SIZE));
+ inodesInuse = new ConcurrentBitSet(blocks+1, BufferManagerImpl.CONCURRENCY_LEVEL);
+ blocksInuse = new ConcurrentBitSet(blocks, BufferManagerImpl.CONCURRENCY_LEVEL);
+ this.blockByteBuffer = new BlockByteBuffer(30, blocks, LOG_BLOCK_SIZE, direct);
+ //ensure that we'll run out of blocks first
+ this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
+ memoryBufferSpace = Math.max(memoryBufferSpace, maxStorageObjectSize);
+ memoryWritePermits = new Semaphore(Math.max(1,
(int)Math.min(memoryBufferSpace/maxStorageObjectSize, Integer.MAX_VALUE)));
+ maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT,
maxStorageObjectSize>>LOG_BLOCK_SIZE);
+ //account for index pointer block overhead
+ if (maxMemoryBlocks > DIRECT_POINTERS) {
+ maxMemoryBlocks--;
+ }
+ if (maxMemoryBlocks > MAX_INDIRECT) {
+ int indirect = maxMemoryBlocks-MAX_INDIRECT;
+ maxMemoryBlocks -= (indirect/ADDRESSES_PER_BLOCK +
(indirect%ADDRESSES_PER_BLOCK>0?1:0) + 1);
+ }
+ List<BlockStore> stores = new ArrayList<BlockStore>();
+ int size = BLOCK_SIZE;
+ do {
+ stores.add(new BlockStore(this.storageManager, size, 30));
+ size <<=2;
+ } while (size>>2 < maxStorageObjectSize);
+ this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
+ }
+
+ double computeNextOrderingValue(long currentTime,
+ long lastAccess, double orderingValue) {
+ orderingValue =
+ //Frequency component
+ orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
+ //recency component
+ + Math.pow(currentTime, crfLamda);
+ return orderingValue;
+ }
+
+ InodeBlockManager getBlockManager(long gid, long oid, int inode) {
+ return new InodeBlockManager(gid, oid, inode);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void add(CacheEntry entry, Serializer s) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "adding object", s.getId(),
entry.getId()); //$NON-NLS-1$
+ }
+ InodeBlockManager blockManager = null;
+ boolean hasPermit = false;
+ PhysicalInfo info = null;
+ boolean newEntry = true;
+ boolean success = false;
+ try {
+ Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
+ if (map == null) {
+ return; //already removed
+ }
+ info = map.get(entry.getId());
+ if (info == null) {
+ if (!map.containsKey(entry.getId())) {
+ return; //already removed
+ }
+ } else {
+ newEntry = false;
+ synchronized (info) {
+ if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(readAttempts.get(),
info)) {
+ success = true;
+ return;
+ }
+ }
+ }
+ memoryWritePermits.acquire();
+ hasPermit = true;
+ blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
+ ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ oos.writeInt(entry.getSizeEstimate());
+ s.serialize(entry.getObject(), oos);
+ oos.close();
+ synchronized (map) {
+ //synchronize to ensure proper cleanup from a concurrent removal
+ if (physicalMapping.containsKey(s.getId()) &&
map.containsKey(entry.getId())) {
+ if (newEntry) {
+ info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(),
fsos.getBytesWritten());
+ map.put(entry.getId(), info);
+ memoryBufferEntries.put(entry.getId(), info);
+ }
+ success = true;
+ }
+ }
+ } catch (Throwable e) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ entry.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ } finally {
+ if (hasPermit) {
+ memoryWritePermits.release();
+ }
+ if (!success && blockManager != null) {
+ blockManager.free(false);
+ }
+ }
+ }
+
+ @Override
+ public CacheEntry get(Long oid, Serializer<?> serializer) throws
TeiidComponentException {
+ long currentTime = readAttempts.incrementAndGet();
+ try {
+ Map<Long, PhysicalInfo> map = physicalMapping.get(serializer.getId());
+ if (map == null) {
+ return null;
+ }
+ final PhysicalInfo info = map.get(oid);
+ if (info == null) {
+ return null;
+ }
+ CacheEntry ce = new CacheEntry(oid);
+ InputStream is = null;
+ synchronized (info) {
+ if (info.inode != EMPTY_ADDRESS) {
+ memoryBufferEntries.get(oid); //touch this entry
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode",
info.inode, serializer.getId(), oid); //$NON-NLS-1$
+ }
+ BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
+ is = new BlockInputStream(manager, info.memoryBlockCount, false);
+ } else if (info.block != EMPTY_ADDRESS) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block",
info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
+ }
+ BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+ FileStore fs =
blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
+ long blockOffset =
(info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
+ is = fs.createInputStream(blockOffset);
+ if (shouldPlaceInMemoryBuffer(currentTime, info) &&
this.memoryWritePermits.tryAcquire()) {
+ BlockManager manager = null;
+ try {
+ manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
+ ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
+ ObjectConverterUtil.write(os, is, -1);
+ memoryBufferEntries.put(info.getId(), info);
+ is = new BlockInputStream(manager, info.memoryBlockCount, false);
+ } finally {
+ this.memoryWritePermits.release();
+ }
+ } else {
+ this.toString();
+ }
+ } else {
+ return null;
+ }
+ }
+ ObjectInputStream ois = new ObjectInputStream(is);
+ ce.setSizeEstimate(ois.readInt());
+ ce.setLastAccess(1);
+ ce.setOrderingValue(1);
+ ce.setObject(serializer.deserialize(ois));
+ ce.setPersistent(true);
+ return ce;
+ } catch(IOException e) {
+ throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
+ } catch (ClassNotFoundException e) {
+ throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
+ }
+ }
+
+ private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
+ Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
+ return lowest == null
+ || (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) >
(info.memoryBlockCount<<3)
+ || lowest.getKey().getOrderingValue() < computeNextOrderingValue(currentTime,
info.getLastAccess(), info.getOrderingValue());
+ }
+
+ @Override
+ public FileStore createFileStore(String name) {
+ return storageManager.createFileStore(name);
+ }
+
+ public void setDirect(boolean direct) {
+ this.direct = direct;
+ }
+
+ @Override
+ public void addToCacheGroup(Long gid, Long oid) {
+ Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
+ if (map == null) {
+ return;
+ }
+ map.put(oid, null);
+ }
+
+ @Override
+ public void createCacheGroup(Long gid) {
+ physicalMapping.put(gid, Collections.synchronizedMap(new HashMap<Long,
PhysicalInfo>()));
+ }
+
+ @Override
+ public void remove(Long gid, Long id) {
+ Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
+ if (map == null) {
+ return;
+ }
+ PhysicalInfo info = map.remove(id);
+ free(id, info, false);
+ }
+
+ @Override
+ public Collection<Long> removeCacheGroup(Long gid) {
+ Map<Long, PhysicalInfo> map = physicalMapping.remove(gid);
+ if (map == null) {
+ return Collections.emptySet();
+ }
+ synchronized (map) {
+ for (Map.Entry<Long, PhysicalInfo> entry : map.entrySet()) {
+ free(entry.getKey(), entry.getValue(), false);
+ }
+ return map.keySet();
+ }
+ }
+
+ int free(Long oid, PhysicalInfo info, boolean demote) {
+ memoryBufferEntries.remove(oid);
+ if (info == null) {
+ return EMPTY_ADDRESS;
+ }
+ synchronized (info) {
+ memoryBufferEntries.remove(oid);
+ if (info.inode == EMPTY_ADDRESS) {
+ return EMPTY_ADDRESS;
+ }
+ BlockManager bm = getBlockManager(info.gid, oid, info.inode);
+ info.inode = EMPTY_ADDRESS;
+ if (demote) {
+ if (info.block == EMPTY_ADDRESS) {
+ BlockInputStream is = new BlockInputStream(bm, info.memoryBlockCount, true);
+ BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+ FileStore fs =
blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
+ info.block = getAndSetNextClearBit(blockStore);
+ long blockOffset =
(info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
+ byte[] b = new byte[BLOCK_SIZE];
+ int read = 0;
+ boolean errored = false;
+ while ((read = is.read(b, 0, b.length)) != -1) {
+ if (!errored) {
+ try {
+ fs.write(blockOffset, b, 0, read);
+ blockOffset+=read;
+ } catch (Throwable e) {
+ //just continue to free
+ errored = true;
+ LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to
storage " + oid); //$NON-NLS-1$
+ }
+ }
+ }
+ return is.free(true);
+ }
+ return bm.free(true);
+ }
+ bm.free(false);
+ if (info.block != EMPTY_ADDRESS) {
+ BlockStore blockStore = sizeBasedStores[info.sizeIndex];
+ blockStore.blocksInUse.clear(info.block);
+ info.block = EMPTY_ADDRESS;
+ }
+ }
+ return EMPTY_ADDRESS;
+ }
+
+ static int getAndSetNextClearBit(BlockStore bs) {
+ int result = bs.blocksInUse.getAndSetNextClearBit();
+ if (result == -1) {
+ throw new TeiidRuntimeException("Out of blocks of size " + bs.blockSize);
//$NON-NLS-1$
+ }
+ return result;
+ }
+
+ /**
+ * Stop the world eviction. Hopefully this should rarely happen.
+ * @return the stole dataBlock
+ */
+ int evictFromMemoryBuffer() {
+ memoryEvictionLock.writeLock().lock();
+ int next = -1;
+ boolean writeLocked = true;
+ try {
+ for (int i = 0; i < 10 && next == EMPTY_ADDRESS; i++) {
+ AutoCleanupUtil.doCleanup();
+ Iterator<Map.Entry<PhysicalInfo, Long>> iter =
memoryBufferEntries.getEvictionQueue().entrySet().iterator();
+ while ((next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS &&
iter.hasNext()) {
+ Map.Entry<PhysicalInfo, Long> entry = iter.next();
+ PhysicalInfo info = entry.getKey();
+ synchronized (info) {
+ if (info.inode == EMPTY_ADDRESS) {
+ continue;
+ }
+ memoryEvictionLock.writeLock().unlock();
+ writeLocked = false;
+ next = free(entry.getValue(), info, true);
+ }
+ break;
+ }
+ }
+ if (next == -1) {
+ throw new AssertionError("Could not free space for pending write");
//$NON-NLS-1$
+ }
+ } finally {
+ if (writeLocked) {
+ memoryEvictionLock.writeLock().unlock();
+ }
+ }
+ return next;
+ }
+
+ public void setStorageManager(StorageManager storageManager) {
+ this.storageManager = storageManager;
+ }
+
+ public StorageManager getStorageManager() {
+ return storageManager;
+ }
+
+ public void setMemoryBufferSpace(long maxBufferSpace) {
+ this.memoryBufferSpace = Math.min(maxBufferSpace,
1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE));
+ }
+
+ public int getInodesInUse() {
+ return this.inodesInuse.getBitsSet();
+ }
+
+ public int getDataBlocksInUse() {
+ return this.blocksInuse.getBitsSet();
+ }
+
+ public void setMaxStorageObjectSize(int maxStorageBlockSize) {
+ this.maxStorageObjectSize = maxStorageBlockSize;
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-12
20:56:57 UTC (rev 3548)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -187,7 +187,7 @@
try {
//it's expected that the containing structure has updated the lob manager
BatchSerializer.writeBatch(oos, types, obj);
- } catch (IndexOutOfBoundsException e) {
+ } catch (RuntimeException e) {
//there is a chance of a concurrent persist while modifying
//in which case we want to swallow this exception
if (list == null || list.getModCount() == expectedModCount) {
@@ -219,7 +219,7 @@
return (List<List<?>>)(!retain?ce.nullOut():ce.getObject());
}
long count = readCount.incrementAndGet();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.TRACE)) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, id, id, "reading batch",
batch, "from storage, total reads:", count); //$NON-NLS-1$ //$NON-NLS-2$
}
ce = cache.get(batch, this);
@@ -268,6 +268,7 @@
}
}
+ static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable
private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
private static ReferenceQueue<CacheEntry> SOFT_QUEUE = new
ReferenceQueue<CacheEntry>();
@@ -292,11 +293,11 @@
//combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher
-> LRU
//TODO: adaptively adjust this value. more hits should move closer to lru
- private final double crfLamda = .0002;
+ private final double crfLamda = .001;
//implements a LRFU cache using the a customized crf function. we store the value
with
//the cache entry to make a better decision about reuse of the batch
//TODO: consider the size estimate in the weighting function
- private OrderedCache<Long, CacheEntry> memoryEntries = new
OrderedCache<Long, CacheEntry>() {
+ private PartiallyOrderedCache<Long, CacheEntry> memoryEntries = new
PartiallyOrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL) {
@Override
protected void recordAccess(Long key, CacheEntry value, boolean initial) {
@@ -599,7 +600,7 @@
}
void evict(CacheEntry ce) throws Exception {
- Serializer<?> s = ce.getSerializer().get();
+ Serializer<?> s = ce.getSerializer();
if (s == null) {
return;
}
@@ -615,8 +616,8 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to
storage, total writes: ", count); //$NON-NLS-1$
}
- cache.add(ce, s);
}
+ cache.add(ce, s);
if (s.useSoftCache()) {
createSoftReference(ce);
} else if (useWeakReferences) {
@@ -689,7 +690,7 @@
if (inMemory) {
activeBatchKB.addAndGet(-ce.getSizeEstimate());
}
- Serializer<?> s = ce.getSerializer().get();
+ Serializer<?> s = ce.getSerializer();
if (s != null) {
cache.remove(s.getId(), ce.getId());
}
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.BitSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A segmented {@link BitSet} that supports greater concurrency
+ * and faster finding of clear bits.
+ */
+public class ConcurrentBitSet {
+
+ private static class Segment {
+ int offset;
+ int maxBits;
+ int startSearch;
+ int bitsSet;
+ final private BitSet bitSet;
+
+ public Segment(int bitCount) {
+ bitSet = new BitSet();
+ maxBits = bitCount;
+ }
+ }
+
+ private int bitsPerSegment;
+ private int totalBits;
+ private AtomicInteger counter = new AtomicInteger();
+ private AtomicInteger bitsSet = new AtomicInteger();
+ private Segment[] segments;
+
+ public ConcurrentBitSet(int maxBits, int concurrencyLevel) {
+ if (maxBits < concurrencyLevel) {
+ concurrencyLevel = 1;
+ while (maxBits > 2*concurrencyLevel) {
+ concurrencyLevel <<=1;
+ }
+ }
+ segments = new Segment[concurrencyLevel];
+ bitsPerSegment = maxBits/concurrencyLevel;
+ int modBits = maxBits%concurrencyLevel;
+ if (modBits > 0) {
+ bitsPerSegment++;
+ }
+ for (int i = 0; i < concurrencyLevel; i++) {
+ segments[i] = new Segment(bitsPerSegment);
+ segments[i].offset = i*bitsPerSegment;
+ if (i == concurrencyLevel - 1) {
+ segments[i].maxBits -= (bitsPerSegment * concurrencyLevel)-maxBits;
+ }
+ }
+ this.totalBits = maxBits;
+ }
+
+ public void clear(int bitIndex) {
+ checkIndex(bitIndex);
+ Segment s = segments[bitIndex/bitsPerSegment];
+ bitIndex = bitIndex%bitsPerSegment;
+ synchronized (s) {
+ if (!s.bitSet.get(bitIndex)) {
+ throw new AssertionError(bitIndex + " not set"); //$NON-NLS-1$
+ }
+ s.bitSet.clear(bitIndex);
+ s.bitsSet--;
+ }
+ bitsSet.decrementAndGet();
+ }
+
+ /**
+ * Makes a best effort to atomically find the next clear bit and set it
+ * @return the next bit index or -1 if no clear bits are founds
+ */
+ public int getAndSetNextClearBit() {
+ int start = counter.getAndIncrement();
+ int nextBit = -1;
+ for (int i = 0; i < segments.length; i++) {
+ Segment s = segments[(start+i)&(segments.length-1)];
+ synchronized (s) {
+ if (s.bitsSet == s.maxBits) {
+ continue;
+ }
+ nextBit = s.bitSet.nextClearBit(s.startSearch);
+ if (nextBit >= s.maxBits - 1) {
+ s.startSearch = 0;
+ nextBit = s.bitSet.nextClearBit(s.startSearch);
+ if (nextBit >= s.maxBits) {
+ throw new AssertionError("could not find clear bit"); //$NON-NLS-1$
+ }
+ }
+ s.bitsSet++;
+ s.bitSet.set(nextBit);
+ s.startSearch = nextBit + 1;
+ if (s.startSearch == s.maxBits) {
+ s.startSearch = 0;
+ }
+ nextBit += s.offset;
+ break;
+ }
+ }
+ if (nextBit != -1) {
+ bitsSet.getAndIncrement();
+ }
+ return nextBit;
+ }
+
+ private void checkIndex(int bitIndex) {
+ if (bitIndex >= totalBits) {
+ throw new ArrayIndexOutOfBoundsException(bitIndex);
+ }
+ }
+
+ public int getTotalBits() {
+ return totalBits;
+ }
+
+ public int getBitsSet() {
+ return bitsSet.get();
+ }
+
+ public int getBitsPerSegment() {
+ return bitsPerSegment;
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,149 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class PartiallyOrderedCache<K, V> {
+
+ private int maxOrderedSize = 1 << 19;
+
+ protected Map<K, V> map;
+ //TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
+ //the level limits the effective map size to ~ 2^16
+ //where it performs comparably under load to a synchronized LinkedHashMap
+ //just with more CPU overhead vs. wait time.
+ //TODO: have the concurrent version be pluggable
+ protected NavigableMap<V, K> evictionQueue = new TreeMap<V, K>();
+ //when we get to extreme number of entries we overflow into lru
+ protected Map<V, K> evictionQueueHead = new LinkedHashMap<V, K>();
+ //holds entries that are being evicted, but that might not yet be in a lower caching
level
+ protected Map<K, V> limbo;
+
+ public PartiallyOrderedCache(int initialCapacity, float loadFactor, int
concurrencyLevel) {
+ map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
+ limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
+ }
+
+ public void setMaxOrderedSize(int maxOrderedSize) {
+ this.maxOrderedSize = maxOrderedSize;
+ }
+
+ public V get(K key) {
+ V result = map.get(key);
+ if (result == null) {
+ result = limbo.get(key);
+ }
+ if (result != null) {
+ maintainQueues(key, result, null);
+ }
+ return result;
+ }
+
+ public V remove(K key) {
+ V result = map.remove(key);
+ if (result != null) {
+ synchronized (this) {
+ if (evictionQueue.remove(result) != null) {
+ orderedRemoved();
+ } else {
+ evictionQueueHead.remove(result);
+ }
+ }
+ }
+ return result;
+ }
+
+ private void orderedRemoved() {
+ if (evictionQueue.size() < (maxOrderedSize>>1) &&
evictionQueueHead.size() > 0) {
+ Iterator<Map.Entry<V,K>> i = evictionQueueHead.entrySet().iterator();
+ if (i.hasNext()) {
+ Map.Entry<V, K> entry = i.next();
+ if (map.containsKey(entry.getValue())) {
+ i.remove();
+ evictionQueue.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+
+ public V put(K key, V value) {
+ V result = map.put(key, value);
+ maintainQueues(key, value, result);
+ return result;
+ }
+
+ private void maintainQueues(K key, V value, V old) {
+ synchronized (this) {
+ if (old != null && evictionQueue.remove(old) == null) {
+ evictionQueueHead.remove(old);
+ }
+ recordAccess(key, value, old == null);
+ evictionQueue.put(value, key);
+ if (evictionQueue.size() > maxOrderedSize) {
+ Map.Entry<V, K> last = evictionQueue.pollLastEntry();
+ if (last != null) {
+ if (map.containsKey(last.getValue()) &&
!evictionQueue.containsKey(last.getKey())) {
+ evictionQueueHead.put(last.getKey(), last.getValue());
+ }
+ }
+ }
+ }
+ }
+
+ public V evict() {
+ Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
+ if (entry == null) {
+ return null;
+ }
+ synchronized (this) {
+ orderedRemoved();
+ }
+ limbo.put(entry.getValue(), entry.getKey());
+ return map.remove(entry.getValue());
+ }
+
+ public Map<V, K> getEvictionQueue() {
+ return evictionQueue;
+ }
+
+ public Map.Entry<V, K> firstEntry() {
+ return evictionQueue.firstEntry();
+ }
+
+ public void finishedEviction(K key) {
+ limbo.remove(key);
+ }
+
+ public int size() {
+ return map.size();
+ }
+
+ protected abstract void recordAccess(K key, V value, boolean initial);
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PartiallyOrderedCache.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
(rev 0)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,171 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.ref.WeakReference;
+
+import org.junit.Test;
+import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.Serializer;
+import org.teiid.core.TeiidComponentException;
+
+public class TestBufferFrontedFileStoreCache {
+
+ private final class SimpleSerializer implements Serializer<Integer> {
+ @Override
+ public Integer deserialize(ObjectInputStream ois)
+ throws IOException, ClassNotFoundException {
+ Integer result = ois.readInt();
+ for (int i = 0; i < result; i++) {
+ assertEquals(i, ois.readInt());
+ }
+ return result;
+ }
+
+ @Override
+ public Long getId() {
+ return 1l;
+ }
+
+ @Override
+ public void serialize(Integer obj, ObjectOutputStream oos)
+ throws IOException {
+ oos.writeInt(obj);
+ for (int i = 0; i < obj; i++) {
+ oos.writeInt(i);
+ }
+ }
+
+ @Override
+ public boolean useSoftCache() {
+ return false;
+ }
+ }
+
+ @Test public void testAddGetMultiBlock() throws Exception {
+ BufferFrontedFileStoreCache cache = createLayeredCache(1 << 26, 1 << 26);
+
+ CacheEntry ce = new CacheEntry(2l);
+ Serializer<Integer> s = new SimpleSerializer();
+ cache.createCacheGroup(s.getId());
+ Integer cacheObject = Integer.valueOf(2);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ ce = cache.get(2l, s);
+ assertEquals(cacheObject, ce.getObject());
+
+ //test something that exceeds the direct inode data blocks
+ ce = new CacheEntry(3l);
+ cacheObject = Integer.valueOf(80000);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ ce = cache.get(3l, s);
+ assertEquals(cacheObject, ce.getObject());
+
+ cache.removeCacheGroup(1l);
+
+ assertEquals(0, cache.getDataBlocksInUse());
+ assertEquals(0, cache.getInodesInUse());
+
+ //test something that exceeds the indirect data blocks
+ ce = new CacheEntry(3l);
+ cache.createCacheGroup(s.getId());
+ cacheObject = Integer.valueOf(5000000);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ ce = cache.get(3l, s);
+ assertEquals(cacheObject, ce.getObject());
+
+ cache.removeCacheGroup(1l);
+
+ assertEquals(0, cache.getDataBlocksInUse());
+ assertEquals(0, cache.getInodesInUse());
+
+ //test something that exceeds the allowable object size
+ ce = new CacheEntry(3l);
+ cache.createCacheGroup(s.getId());
+ cacheObject = Integer.valueOf(500000000);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ ce = cache.get(3l, s);
+ assertNull(ce);
+
+ cache.removeCacheGroup(1l);
+
+ assertEquals(0, cache.getDataBlocksInUse());
+ assertEquals(0, cache.getInodesInUse());
+ }
+
+ @Test public void testEviction() throws Exception {
+ BufferFrontedFileStoreCache cache = createLayeredCache(1<<15, 1<<15);
+
+ CacheEntry ce = new CacheEntry(2l);
+ Serializer<Integer> s = new SimpleSerializer();
+ WeakReference<? extends Serializer<?>> ref = new
WeakReference<Serializer<?>>(s);
+ ce.setSerializer(ref);
+ cache.createCacheGroup(s.getId());
+ Integer cacheObject = Integer.valueOf(5000);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ ce = new CacheEntry(3l);
+ ce.setSerializer(ref);
+ cacheObject = Integer.valueOf(5000);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ assertEquals(3, cache.getDataBlocksInUse());
+ assertEquals(1, cache.getInodesInUse());
+
+ ce = cache.get(2l, s);
+ assertEquals(Integer.valueOf(5000), ce.getObject());
+ }
+
+ private BufferFrontedFileStoreCache createLayeredCache(int bufferSpace, int objectSize)
throws TeiidComponentException {
+ BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
+ fsc.setMemoryBufferSpace(bufferSpace);
+ fsc.setMaxStorageObjectSize(objectSize);
+ fsc.setDirect(false);
+ SplittableStorageManager ssm = new SplittableStorageManager(new
MemoryStorageManager());
+ ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
+ fsc.setStorageManager(ssm);
+ fsc.initialize();
+ return fsc;
+ }
+
+}
Property changes on:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
(rev 0)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,65 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+ package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class TestConcurrentBitSet {
+
+ @Test public void testBitsSet() {
+ ConcurrentBitSet bst = new ConcurrentBitSet(50001, 4);
+ assertEquals(0, bst.getAndSetNextClearBit());
+ assertEquals(12501, bst.getAndSetNextClearBit());
+ assertEquals(25002, bst.getAndSetNextClearBit());
+ assertEquals(37503, bst.getAndSetNextClearBit());
+ assertEquals(1, bst.getAndSetNextClearBit());
+ assertEquals(5, bst.getBitsSet());
+ bst.clear(1);
+ assertEquals(4, bst.getBitsSet());
+ bst.clear(12501);
+ try {
+ bst.clear(30000);
+ fail();
+ } catch (AssertionError e) {
+
+ }
+ assertEquals(3, bst.getBitsSet());
+
+ for (int i = 0; i < bst.getTotalBits()-3;i++) {
+ assertTrue(bst.getAndSetNextClearBit() != -1);
+ }
+
+ bst.clear(5);
+ bst.clear(12505);
+ bst.clear(25505);
+ bst.clear(37505);
+
+ for (int i = 0; i < 4; i++) {
+ int bit = bst.getAndSetNextClearBit();
+ assertTrue(bit < bst.getTotalBits() && bit > 0);
+ }
+ }
+
+}
Property changes on:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
(rev 0)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java 2011-10-12
23:23:02 UTC (rev 3549)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+public class TestPartiallyOrderedCache {
+
+ @Test public void testQueueMaintenance() {
+ PartiallyOrderedCache<Integer, Integer> cache = new
PartiallyOrderedCache<Integer, Integer>(16, .75f, 16) {
+
+ @Override
+ protected void recordAccess(Integer key, Integer value, boolean initial) {
+
+ }
+ };
+
+ cache.setMaxOrderedSize(5);
+
+ for (int i = 0; i < 10; i++) {
+ cache.put(i, i);
+ }
+
+ cache.get(8);
+ cache.get(1);
+
+ List<Integer> evictions = new ArrayList<Integer>();
+ for (int i = 0; i < 10; i++) {
+ evictions.add(i);
+ }
+ //we expect natural order because the lru is converted into the sorted on natural key
+ assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), evictions);
+ }
+
+}
Property changes on:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestPartiallyOrderedCache.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain