Author: shawkins
Date: 2011-10-19 21:51:50 -0400 (Wed, 19 Oct 2011)
New Revision: 3571
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/build/kits/jboss-container/teiid-releasenotes.html
trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1750 lowering batch sizes, removing the small byte buffer slices in favor of
absolute positioning
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-10-19
15:42:21 UTC (rev 3570)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-10-20
01:51:50 UTC (rev 3571)
@@ -24,10 +24,10 @@
<property name="useDisk">true</property>
<!-- Directory location for the buffer files -->
<property
name="diskDirectory">${jboss.server.temp.dir}/teiid</property>
- <!-- The max row count of a batch sent internally within the query processor.
Should be <= the connectorBatchSize. (default 512) -->
- <property name="processorBatchSize">512</property>
- <!-- The max row count of a batch from a connector. Should be even multiple of
processorBatchSize. (default 1024) -->
- <property name="connectorBatchSize">1024</property>
+ <!-- The max row count of a batch sent internally within the query processor.
Should be <= the connectorBatchSize. (default 256) -->
+ <property name="processorBatchSize">256</property>
+ <!-- The max row count of a batch from a connector. Should be even multiple of
processorBatchSize. (default 512) -->
+ <property name="connectorBatchSize">512</property>
<!--
The approximate amount of buffer memory in kilobytes allowable for a single
processing operation (sort, grouping, etc.) regardless of existing memory commitments. -1
means to automatically calculate a value (default -1).
See the admin guide for more.
Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-10-19 15:42:21 UTC (rev
3570)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-10-20 01:51:50 UTC (rev
3571)
@@ -30,10 +30,10 @@
<LI><B>File Enhancements</B> - the file translator can now optionally
(via the ExceptionIfFileNotFound property) throw an exception if the path refers to a file
that doesn't exist. The file resource adapter can be configured to map file names and
can prevent parent path .. references. See the Admin Guide or the file-ds.xml template
for more.
<LI><B>TEXTTABLE Enhancements</B> - TEXTTABLE can now parse fixed
width files that do not use a row delimiter and can optionally produce fixed values that
haven't been trimmed.
<LI><B>Temp table transactions</B> - Internal materialized views and
temp table usage from a session and within procedures can take advantage of greater
transaction support.
- <LI><B>Buffering Improvements</B> - Added the ability to inline
memory based or small lobs and added tracking of the memory held by soft references. Also
switched to a LFRU algorithm that significantly reduces writes and read misses with
temporary tables.
+ <LI><B>Buffering Improvements</B> - Added the ability to inline
memory based or small lobs and added tracking of the memory held by soft references. Also
switched to a concurrent LFRU algorithm that significantly reduces writes and read misses
with temporary tables. Added a memory buffer to better handle file storage.
+ The memory buffer may be optional configured as off-heap for better large memory
performance.
<LI><B>GSSAPI</B> - both the Teiid JDBC client/server and the ODBC pg
backend can now support GSSAPI for single sign-on.
<LI><B>Server-side Query Timeouts</B> - default query timeouts can be
configured at both the VDB (via the query-timeout VDB property) and entire server (via the
teiid-jboss-beans.xml queryTimeout property).
- <LI><B>Memory Improvements</B> - buffering was optimized for
concurrency and to better handle table querying instead of tuple buffers. Added a memory
buffer to better handle file storage. The memory buffer may be optional configured as
off-heap for better large memory performance.
</UL>
<h2><a name="Compatibility">Compatibility
Issues</a></h2>
@@ -119,6 +119,11 @@
<h4>from 7.4</h4>
<ul>
+ <li>The configuration for the buffer service now defaults to 256/512 for
processor and connector batch sizes respectively. The buffer service also has 4 new
properties inlineLobs, memoryBufferSpace, memoryBufferOffHeap, and maxStorageObjectSize.
+</ul>
+
+<h4>from 7.4</h4>
+<ul>
<LI>The configuration for authorization has been moved off of the
RuntimeEngineDeployer bean and onto separate AuthorizationValidator and PolicyDecider
beans.
<LI>The configuration for the buffer manager has been simplified to refer to
memory sizes in KB, rather than batch columns.
</ul>
Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
===================================================================
---
trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-10-20
01:51:50 UTC (rev 3571)
@@ -67,7 +67,7 @@
<title>Big Data/Memory</title>
<para>Usage of extremely large VM sizes and or datasets requires additional
considerations.
Teiid has a non-negligible amount of overhead per batch/table page on the order of
100-200 bytes. Depending on the data types involved each
- full batch/table page will represent between 64 and 4096 rows. If you are dealing
with datasets with billions of rows and you run into OutOfMemory issues, consider
increasing the processor
+ full batch/table page will represent a variable number of rows (a power of two
multiple above or below the processor batch size). If you are dealing with datasets with
billions of rows and you run into OutOfMemory issues, consider increasing the processor
batch size in the &jboss-beans; file to force the allocation of larger batches and
table pages. If the processor batch size is increased and/or you are dealing with
extremely wide result sets (several hundred columns),
then the default setting of 8MB for the maxStorageObjectSize in the &jboss-beans;
file may be too low. The sizing for maxStorageObjectSize is terms of serialized size,
which will be much
closer to the raw data size then the Java memory footprint estimation used for
maxReservedKB.
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-19
15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -61,8 +61,8 @@
NO_WAIT
}
- public static int DEFAULT_CONNECTOR_BATCH_SIZE = 1024;
- public static int DEFAULT_PROCESSOR_BATCH_SIZE = 512;
+ public static int DEFAULT_CONNECTOR_BATCH_SIZE = 512;
+ public static int DEFAULT_PROCESSOR_BATCH_SIZE = 256;
public static int DEFAULT_MAX_PROCESSING_KB = -1;
public static int DEFAULT_RESERVE_BUFFER_KB = -1;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-10-19 15:42:21 UTC
(rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-10-20 01:51:50 UTC
(rev 3571)
@@ -49,8 +49,6 @@
@SuppressWarnings("unchecked")
class SPage implements Cloneable {
- static final int MIN_PERSISTENT_SIZE = 16;
-
static class SearchResult {
int index;
SPage page;
@@ -186,7 +184,7 @@
if (values instanceof LightWeightCopyOnWriteList<?>) {
values = ((LightWeightCopyOnWriteList<List<?>>)values).getList();
}
- if (values.size() < MIN_PERSISTENT_SIZE) {
+ if (values.size() < stree.minPageSize) {
setDirectValues(values);
return;
} else if (stree.batchInsert && children == null && values.size() <
stree.leafSize) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-10-19 15:42:21 UTC
(rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-10-20 01:51:50 UTC
(rev 3571)
@@ -65,6 +65,7 @@
protected ListNestedSortComparator comparator;
private int pageSize;
protected int leafSize;
+ protected int minPageSize;
protected int keyLength;
protected boolean batchInsert;
protected SPage incompleteInsert;
@@ -86,7 +87,7 @@
manager.setPrefersMemory(true);
this.leafManager = leafManager;
this.comparator = comparator;
- this.pageSize = Math.max(pageSize, SPage.MIN_PERSISTENT_SIZE);
+ this.pageSize = pageSize;
pageSize >>>= 3;
while (pageSize > 0) {
pageSize >>>= 1;
@@ -97,6 +98,7 @@
this.leafSize = leafSize;
this.keyLength = keyLength;
this.lobManager = lobManager;
+ this.minPageSize = this.pageSize>>5;
}
public STree clone() {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -42,22 +42,12 @@
}
}
- private static class BlockInfo {
- final ByteBuffer bb;
- final int block;
- public BlockInfo(ByteBuffer bb, int block) {
- this.bb = bb;
- this.block = block;
- }
- }
-
private int blockAddressBits;
private int segmentAddressBits;
private int segmentSize;
private int blockSize;
private int blockCount;
private ThreadLocal<ByteBuffer>[] buffers;
- private BlockInfo[] bufferCache;
/**
* Creates a new {@link BlockByteBuffer} where each buffer segment will be
@@ -88,8 +78,6 @@
if (lastSegmentSize > 0) {
buffers[fullSegments] = new ThreadLocalByteBuffer(allocate(lastSegmentSize, direct));
}
- int logSize = 32 - Integer.numberOfLeadingZeros(blockCount);
- bufferCache = new BlockInfo[Math.min(logSize, 20)];
}
public static ByteBuffer allocate(int size, boolean direct) {
@@ -106,7 +94,7 @@
}
/**
- * Return a buffer containing the given start byte.
+ * Return a buffer positioned at the given start byte.
* It is assumed that the caller will handle blocks in
* a thread safe manner.
* @param startIndex
@@ -116,21 +104,12 @@
if (block < 0 || block >= blockCount) {
throw new IndexOutOfBoundsException("Invalid block " + block);
//$NON-NLS-1$
}
- int cacheIndex = block&(bufferCache.length -1);
- BlockInfo info = bufferCache[cacheIndex];
- if (info != null && info.block == block) {
- info.bb.rewind();
- return info.bb;
- }
int segment = block>>(segmentAddressBits-blockAddressBits);
ByteBuffer bb = buffers[segment].get();
- bb.limit(bb.capacity());
+ bb.rewind();
int position = (block<<blockAddressBits)&(segmentSize-1);
- bb.position(position);
bb.limit(position + blockSize);
- bb = bb.slice();
- info = new BlockInfo(bb, block);
- bufferCache[cacheIndex] = info;
+ bb.position(position);
return bb;
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -53,7 +53,6 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.ExecutorUtils;
-import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
@@ -166,27 +165,27 @@
if (index >= MAX_INDIRECT) {
position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
ByteBuffer next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT, value,
mode);
- if (next != info) {
+ if (next != null) {
info = next;
//should have traversed to the secondary
int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
- position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
- if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_SIZE) {
+ position = info.position() + indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
info.limit()) {
info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
}
next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT +
indirectAddressBlock * ADDRESSES_PER_BLOCK, value, mode);
- if (next != info) {
+ if (next != null) {
info = next;
- position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
+ position = info.position() + ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) *
BYTES_PER_BLOCK_ADDRESS;
}
}
} else if (index >= DIRECT_POINTERS) {
//indirect
position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
ByteBuffer next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS,
value, mode);
- if (next != info) {
+ if (next != null) {
info = next;
- position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
+ position = next.position() + (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
}
} else {
position = BYTES_PER_BLOCK_ADDRESS*index;
@@ -194,7 +193,7 @@
if (mode == Mode.ALLOCATE) {
dataBlock = nextBlock(true);
info.putInt(position, dataBlock);
- if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_SIZE) {
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
info.limit()) {
//maintain the invariant that the next pointer is empty
info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
}
@@ -215,7 +214,7 @@
buf.putInt(position, sib_index);
} else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
freeDataBlock(sib_index);
- return buf;
+ return null;
}
}
return blockByteBuffer.getByteBuffer(sib_index);
@@ -273,7 +272,7 @@
ByteBuffer bb = getInodeBlock();
bb.putInt(EMPTY_ADDRESS);
}
- inodeBuffer = inodeByteBuffer.getByteBuffer(inode);
+ inodeBuffer = inodeByteBuffer.getByteBuffer(inode).slice();
}
return inodeBuffer;
}
@@ -299,7 +298,7 @@
if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
}
- bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock);
+ bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock).slice();
freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
freeDataBlock(doublyIndirectIndexBlock);
return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
@@ -307,7 +306,7 @@
private boolean freeIndirectBlock(int indirectIndexBlock) {
ByteBuffer bb = blockByteBuffer.getByteBuffer(indirectIndexBlock);
- boolean freedAll = freeBlock(0, bb, ADDRESSES_PER_BLOCK, true);
+ boolean freedAll = freeBlock(bb.position(), bb, ADDRESSES_PER_BLOCK, true);
freeDataBlock(indirectIndexBlock);
return freedAll;
}
@@ -409,7 +408,7 @@
if ((size>>1) >= maxStorageObjectSize) {
size>>=1; //adjust the last block size if needed
}
- stores.add(new BlockStore(this.storageManager, size, 30,
BufferManagerImpl.CONCURRENCY_LEVEL>>2));
+ stores.add(new BlockStore(this.storageManager, size, 15,
BufferManagerImpl.CONCURRENCY_LEVEL>>2));
size <<=2;
} while (size>>2 < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
@@ -449,20 +448,22 @@
if (!map.containsKey(entry.getId())) {
return true; //already removed
}
- info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS);
+ info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS,
(int)readAttempts.get());
+ info.adding = true;
map.put(entry.getId(), info);
}
}
}
if (!newEntry) {
synchronized (info) {
- if (info.inode == EMPTY_ADDRESS && info.block == EMPTY_ADDRESS) {
+ if (info.adding) {
return false; //someone else is responsible for adding this cache entry
}
if (info.evicting || info.inode != EMPTY_ADDRESS
|| !shouldPlaceInMemoryBuffer(0, info)) {
return true; //safe to remove from tier 1
}
+ info.adding = true;
//second chance re-add to the cache, we assume that serialization would be faster
than a disk read
}
}
@@ -492,19 +493,35 @@
synchronized (map) {
if (physicalMapping.containsKey(s.getId()) &&
map.containsKey(entry.getId())) {
synchronized (info) {
+ //set the size first, since it may raise an exceptional condition
+ info.setSize(bos.getBytesWritten());
info.inode = blockManager.getInode();
- info.setSize(bos.getBytesWritten());
- memoryBufferEntries.touch(info, newEntry);
+ memoryBufferEntries.add(info);
}
success = true;
}
}
} catch (Throwable e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ entry.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ if (e == PhysicalInfo.sizeChanged) {
+ //System.out.println("size changed " + info.inode + " " +
info.block + " " + info);
+ //entries are mutable after adding, the original should be removed shortly so just
ignore
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId()
+" changed size since first persistence, keeping the original."); //$NON-NLS-1$
//$NON-NLS-2$
+ } else {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts
to read "+ entry.getId() +" later will result in an exception.");
//$NON-NLS-1$ //$NON-NLS-2$
+ }
} finally {
if (hasPermit) {
memoryWritePermits.release();
}
+ if (info != null) {
+ synchronized (info) {
+ info.adding = false;
+ if (!success && blockManager != null) {
+ //invalidate for safety
+ info.inode = EMPTY_ADDRESS;
+ }
+ }
+ }
if (!success && blockManager != null) {
blockManager.free(false);
}
@@ -558,23 +575,21 @@
if (serializer == null) {
return null;
}
- long currentTime = readAttempts.incrementAndGet();
InputStream is = null;
- boolean inStorage = false;
try {
synchronized (info) {
assert !info.pinned && info.loading; //load should be locked
await(info, true, false); //not necessary, but should make things safer
if (info.inode != EMPTY_ADDRESS) {
info.pinned = true;
- memoryBufferEntries.touch(info, false);
+ memoryBufferEntries.touch(info);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at
inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
} else if (info.block != EMPTY_ADDRESS) {
- inStorage = true;
+ memoryBufferEntries.recordAccess(info);
storageReads.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at
block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
@@ -587,28 +602,6 @@
return null;
}
}
- if (inStorage && shouldPlaceInMemoryBuffer(currentTime, info) &&
this.memoryWritePermits.tryAcquire()) {
- BlockManager manager = null;
- boolean success = false;
- try {
- manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
- ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
- ObjectConverterUtil.write(os, is, -1);
- synchronized (info) {
- assert !info.pinned;
- info.inode = manager.getInode();
- info.pinned = true;
- memoryBufferEntries.touch(info, false);
- is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
- }
- success = true;
- } finally {
- this.memoryWritePermits.release();
- if (!success && manager != null) {
- manager.free(false);
- }
- }
- }
ObjectInputStream ois = new ObjectInputStream(is);
CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), ois.readInt(),
serializer.deserialize(ois), ref, true);
return ce;
@@ -765,6 +758,9 @@
}
if (block != EMPTY_ADDRESS) {
if (demote) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Assigning storage data
block", block, "of size", sizeBasedStores[info.sizeIndex].blockSize);
//$NON-NLS-1$ //$NON-NLS-2$
+ }
info.block = block;
} else {
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
@@ -903,6 +899,9 @@
* Currently should be 48 bytes.
*/
final class PhysicalInfo extends BaseCacheEntry {
+
+ static final Exception sizeChanged = new Exception();
+
final Long gid;
//the memory inode and block count
int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
@@ -914,20 +913,26 @@
boolean pinned; //indicates that the entry is being read
boolean evicting; //indicates that the entry will be moved out of the memory buffer
boolean loading; //used by tier 1 cache to prevent double loads
+ boolean adding; //used to prevent double adds
- public PhysicalInfo(Long gid, Long id, int inode) {
- super(new CacheKey(id, 0, 0));
+ public PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
+ super(new CacheKey(id, lastAccess, 0));
this.inode = inode;
this.gid = gid;
}
- public void setSize(int size) {
- this.memoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) +
((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
- int blocks = memoryBlockCount;
- this.sizeIndex = 0;
- while (blocks >= 1) {
+ public void setSize(int size) throws Exception {
+ int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) +
((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+ if (this.memoryBlockCount != 0) {
+ if (newMemoryBlockCount != memoryBlockCount) {
+ throw sizeChanged;
+ }
+ return; //no changes
+ }
+ this.memoryBlockCount = newMemoryBlockCount;
+ while (newMemoryBlockCount >= 1) {
this.sizeIndex++;
- blocks>>=2;
+ newMemoryBlockCount>>=2;
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -203,13 +203,13 @@
old = fastGet(previous, prefersMemory.get(), true);
}
}
- CacheKey key = new CacheKey(oid, 0, old!=null?old.getKey().getOrderingValue():0);
+ CacheKey key = new CacheKey(oid, (int)readAttempts.get(),
old!=null?old.getKey().getOrderingValue():0);
CacheEntry ce = new CacheEntry(key, sizeEstimate, batch, this.ref, false);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE))
{
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to
BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate());
//$NON-NLS-1$ //$NON-NLS-2$
}
cache.addToCacheGroup(id, ce.getId());
- addMemoryEntry(ce);
+ addMemoryEntry(ce, true);
return oid;
}
@@ -285,7 +285,7 @@
cache.remove(this.id, batch);
}
if (retain) {
- addMemoryEntry(ce);
+ addMemoryEntry(ce, false);
}
} finally {
cache.unlockForLoad(o);
@@ -693,8 +693,10 @@
if (ce == null) {
break;
}
- if (!memoryEntries.containsKey(ce.getId())) {
- continue; //not currently a valid eviction
+ synchronized (ce) {
+ if (!memoryEntries.containsKey(ce.getId())) {
+ continue; //not currently a valid eviction
+ }
}
boolean evicted = true;
try {
@@ -763,7 +765,7 @@
//there is a minute chance the batch was evicted
//this call ensures that we won't leak
if (memoryEntries.containsKey(batch)) {
- evictionQueue.touch(ce, false);
+ evictionQueue.touch(ce);
}
} else {
evictionQueue.remove(ce);
@@ -791,7 +793,7 @@
if (ce != null && ce.getObject() != null) {
referenceHit.getAndIncrement();
if (retain) {
- addMemoryEntry(ce);
+ addMemoryEntry(ce, false);
} else {
BufferManagerImpl.this.remove(ce, false);
}
@@ -823,11 +825,15 @@
}
}
- void addMemoryEntry(CacheEntry ce) {
+ void addMemoryEntry(CacheEntry ce, boolean initial) {
persistBatchReferences();
synchronized (ce) {
memoryEntries.put(ce.getId(), ce);
- evictionQueue.touch(ce, true);
+ if (initial) {
+ evictionQueue.add(ce);
+ } else {
+ evictionQueue.touch(ce);
+ }
}
activeBatchKB.getAndAdd(ce.getSizeEstimate());
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -30,6 +30,7 @@
protected ByteBuffer buf;
protected int bytesWritten;
+ private int startPosition;
public ExtensibleBufferedOutputStream() {
}
@@ -45,6 +46,7 @@
private void ensureBuffer() {
if (buf == null) {
buf = newBuffer();
+ startPosition = buf.position();
}
}
@@ -63,8 +65,11 @@
}
public void flush() throws IOException {
- if (buf != null && buf.position() > 0) {
- bytesWritten += flushDirect(buf.position());
+ if (buf != null) {
+ int bytes = buf.position() - startPosition;
+ if (bytes > 0) {
+ bytesWritten += flushDirect(bytes);
+ }
}
buf = null;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -60,11 +60,13 @@
return evictionQueue.remove(value.getKey()) != null;
}
- public void touch(V value, boolean initial) {
- if (!initial) {
- initial = evictionQueue.remove(value.getKey()) == null;
- }
- recordAccess(value, initial);
+ public boolean add(V value) {
+ return evictionQueue.put(value.getKey(), value) == null;
+ }
+
+ public void touch(V value) {
+ evictionQueue.remove(value.getKey());
+ recordAccess(value);
evictionQueue.put(value.getKey(), value);
}
@@ -85,14 +87,13 @@
return null;
}
- protected void recordAccess(V value, boolean initial) {
- assert Thread.holdsLock(value);
+ /**
+ * Callers should be synchronized on value
+ */
+ public void recordAccess(V value) {
CacheKey key = value.getKey();
int lastAccess = key.getLastAccess();
long currentClock = clock.get();
- if (initial && lastAccess == 0) {
- return; //we just want to timestamp this as created and not give it an ordering value
- }
float orderingValue = key.getOrderingValue();
orderingValue = computeNextOrderingValue(currentClock, lastAccess,
orderingValue);
@@ -128,7 +129,7 @@
break;
}
}
- this.maxInterval = i-1;
+ this.maxInterval = 1<<(i-1);
}
}
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -90,6 +90,16 @@
ce = get(cache, 3l, s);
assertEquals(cacheObject, ce.getObject());
+ //repeat the test to ensure proper cleanup
+ ce = new CacheEntry(4l);
+ cacheObject = Integer.valueOf(60000);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ ce = get(cache, 4l, s);
+ assertEquals(cacheObject, ce.getObject());
+
cache.removeCacheGroup(1l);
assertEquals(0, cache.getDataBlocksInUse());
Modified:
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
---
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-19
15:42:21 UTC (rev 3570)
+++
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -94,8 +94,8 @@
svc.start();
BufferManager mgr = svc.getBufferManager();
- assertEquals(6570, mgr.getSchemaSize(schema));
- assertEquals(256, mgr.getProcessorBatchSize(schema));
+ assertEquals(3285, mgr.getSchemaSize(schema));
+ assertEquals(128, mgr.getProcessorBatchSize(schema));
}
}
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-10-19
15:42:21 UTC (rev 3570)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-10-20
01:51:50 UTC (rev 3571)
@@ -130,6 +130,7 @@
BufferServiceImpl bsi = new BufferServiceImpl();
bsi.setDiskDirectory(UnitTestUtil.getTestScratchPath());
this.dqp.setBufferService(bsi);
+ bsi.start();
}
this.dqp.setCacheFactory(new DefaultCacheFactory());