[teiid-commits] teiid SVN: r3555 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 3 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Mon Oct 17 23:37:08 EDT 2011
Author: shawkins
Date: 2011-10-17 23:37:06 -0400 (Mon, 17 Oct 2011)
New Revision: 3555
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
Log:
TEIID-1750 correcting a threading issue with reads during an evict and switching to mostly non-blocking reservations TEIID-1784 correcting inital sort incomplete reservations
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -52,15 +52,11 @@
public enum BufferReserveMode {
/**
- * Claim unused buffers up to the amount requested, using a progressive decaying algorithm
- */
- WAIT,
- /**
* Claim all of the buffers requested, even if they are not available, without waiting
*/
FORCE,
/**
- * Claim unused buffers up to the amount requested witout waiting
+ * Claim unused buffers up to the amount requested without waiting
*/
NO_WAIT
}
@@ -135,5 +131,12 @@
* Set the maxActivePlans as a hint at determining the maxProcessingKB
* @param maxActivePlans
*/
- void setMaxActivePlans(int maxActivePlans);
+ void setMaxActivePlans(int maxActivePlans);
+
+ /**
+ * Wait for additional buffers to become available.
+ * @param additional
+ * @return
+ */
+ int reserveAdditionalBuffers(int additional);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -42,6 +42,10 @@
this.persistent = persistent;
}
+ public void setObject(Object object) {
+ this.object = object;
+ }
+
public int getSizeEstimate() {
return sizeEstimate;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -34,10 +34,12 @@
int blockIndex;
ByteBuffer buf;
boolean done;
+ private final boolean threadSafe;
- BlockInputStream(BlockManager manager, int blockCount) {
+ BlockInputStream(BlockManager manager, int blockCount, boolean threadSafe) {
this.manager = manager;
this.maxBlock = blockCount;
+ this.threadSafe = threadSafe;
}
@Override
@@ -56,6 +58,9 @@
return;
}
buf = manager.getBlock(blockIndex++);
+ if (threadSafe) {
+ buf = buf.duplicate();
+ }
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -241,8 +241,8 @@
memoryEvictionLock.readLock().unlock();
}
}
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_DQP, "Allocating", data?"data":"index", "block", next, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Allocating", data?"data":"index", "block", next, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$ //$NON-NLS-5$
}
return next;
}
@@ -254,8 +254,8 @@
}
private void freeDataBlock(int dataBlock) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "freeing data block", dataBlock, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
}
blocksInuse.clear(dataBlock);
}
@@ -267,8 +267,8 @@
if (this.inode == -1) {
throw new AssertionError("Out of inodes"); //$NON-NLS-1$
}
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating inode", this.inode, "to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
}
ByteBuffer bb = getInodeBlock();
bb.putInt(EMPTY_ADDRESS);
@@ -288,8 +288,8 @@
int indirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
int doublyIndirectIndexBlock = bb.getInt(BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
boolean freedAll = freeBlock(acquire?BYTES_PER_BLOCK_ADDRESS:0, bb, DIRECT_POINTERS-(acquire?1:0), true);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "freeing inode", inode, "for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
}
inodesInuse.clear(inode);
if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
@@ -427,8 +427,8 @@
@SuppressWarnings("unchecked")
@Override
public boolean add(CacheEntry entry, Serializer s) {
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "adding object", s.getId(), entry.getId()); //$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "adding object", s.getId(), entry.getId()); //$NON-NLS-1$
}
boolean newEntry = false;
InodeBlockManager blockManager = null;
@@ -469,7 +469,7 @@
//proactively create freespace
if (!cleanerRunning.get()) {
if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Starting memory buffer cleaner"); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
asynchPool.execute(cleaningTask);
if (lowBlocks(true)) {
//do a non-blocking removal before we're forced to block
@@ -563,21 +563,21 @@
boolean inStorage = false;
try {
synchronized (info) {
- await(info, true, false);
+ assert !info.pinned && info.loading; //load should be locked
+ await(info, true, false); //not necessary, but should make things safer
if (info.inode != EMPTY_ADDRESS) {
info.pinned = true;
memoryBufferEntries.touch(info, false);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
- is = new BlockInputStream(manager, info.memoryBlockCount);
+ is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
} else if (info.block != EMPTY_ADDRESS) {
- assert !info.pinned;
inStorage = true;
storageReads.incrementAndGet();
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
}
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
@@ -595,19 +595,17 @@
ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
ObjectConverterUtil.write(os, is, -1);
synchronized (info) {
+ assert !info.pinned;
info.inode = manager.getInode();
info.pinned = true;
memoryBufferEntries.touch(info, false);
+ is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
}
- is = new BlockInputStream(manager, info.memoryBlockCount);
success = true;
} finally {
this.memoryWritePermits.release();
if (!success && manager != null) {
manager.free(false);
- synchronized (info) {
- info.inode = EMPTY_ADDRESS;
- }
}
}
}
@@ -707,10 +705,10 @@
synchronized (info) {
//if we're a demotion then the free flag was already checked and set
if (!demote) {
- //let a pending free finish - it would be nice if we could pre-empt
+ //let any pending finish - it would be nice if we could pre-empt
//since we can save some work, but this should be rare enough
//to just block
- await(info, false, true);
+ await(info, true, true);
info.evicting = true;
} else {
assert info.evicting;
@@ -729,11 +727,11 @@
try {
if (demote && block == EMPTY_ADDRESS) {
storageWrites.getAndIncrement();
- BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
+ BlockInputStream is = new BlockInputStream(bm, memoryBlockCount, false); //we know this can always be single threaded
BlockStore blockStore = sizeBasedStores[sizeIndex];
block = getAndSetNextClearBit(blockStore);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Allocating storage data block", info.block, "of size", blockStore.blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-1$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data block", info.block, "of size", blockStore.blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
FileStore fs = blockStore.stores[block/blockStore.blocksInUse.getBitsPerSegment()];
long blockOffset = (block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
@@ -748,7 +746,7 @@
//shouldn't happen, but we'll invalidate this write and continue
demote = false;
//just continue to free
- LogManager.logError(LogConstants.CTX_DQP, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
}
}
} finally {
@@ -756,6 +754,7 @@
synchronized (info) {
//it is possible for a read to happen while evicting.
//that's ok, we'll just wait for it to finish
+ assert info.evicting;
await(info, true, false);
info.evicting = false;
info.notifyAll();
@@ -770,8 +769,8 @@
} else {
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
blockStore.blocksInUse.clear(info.block);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Freed storage data block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$ //$NON-NLS-2$
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Freed storage data block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$ //$NON-NLS-2$
}
info.block = EMPTY_ADDRESS;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -84,6 +84,10 @@
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
+ /**
+ * Asynch cleaner attempts to age out old entries and to reduce the memory size when
+ * little is reserved.
+ */
private static final class Cleaner extends TimerTask {
WeakReference<BufferManagerImpl> bufferRef;
@@ -99,7 +103,8 @@
this.cancel();
return;
}
- if (impl.reserveBatchKB.get() < impl.maxReserveKB.get()*.9 || impl.activeBatchKB.get() < (impl.maxReserveKB.get()>>3)) {
+ boolean agingOut = false;
+ if (impl.reserveBatchKB.get() < impl.maxReserveKB.get()*.9 || impl.activeBatchKB.get() < impl.maxReserveKB.get()*.7) {
CacheEntry entry = impl.evictionQueue.firstEntry(false);
if (entry == null) {
return;
@@ -114,8 +119,19 @@
if (currentTime - lastAccess < 1<<28) {
return;
}
+ agingOut = true;
}
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchKB.get(), impl.maxReserveKB.get(), impl.activeBatchKB.get()); //$NON-NLS-1$
+ }
impl.doEvictions(0, false);
+ if (!agingOut) {
+ try {
+ Thread.sleep(100); //we don't want to evict too fast, because the processing threads are more than capable of evicting
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
}
}
}
@@ -335,6 +351,12 @@
LrfuEvictionQueue<CacheEntry> evictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
ConcurrentHashMap<Long, CacheEntry> memoryEntries = new ConcurrentHashMap<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL);
+ private ThreadLocal<Integer> reservedByThread = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return 0;
+ }
+ };
+
//limited size reference caches based upon the memory settings
private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache;
private Map<Long, BatchSoftReference> softCache = Collections.synchronizedMap(new LinkedHashMap<Long, BatchSoftReference>(16, .75f, false) {
@@ -343,13 +365,12 @@
protected boolean removeEldestEntry(Map.Entry<Long,BatchSoftReference> eldest) {
if (size() > maxSoftReferences) {
BatchSoftReference bsr = eldest.getValue();
- maxReserveKB.addAndGet(bsr.sizeEstimate);
- reserveBatchKB.addAndGet(bsr.sizeEstimate);
- bsr.clear();
+ clearSoftReference(bsr);
return true;
}
return false;
- };
+ }
+
});
private Cache cache;
@@ -366,9 +387,18 @@
private static final Timer timer = new Timer("BufferManager Cleaner", true); //$NON-NLS-1$
public BufferManagerImpl() {
- //timer.schedule(new Cleaner(this), 30000, 30000);
+ timer.schedule(new Cleaner(this), 15000, 15000);
}
+ void clearSoftReference(BatchSoftReference bsr) {
+ synchronized (bsr) {
+ maxReserveKB.addAndGet(bsr.sizeEstimate);
+ reserveBatchKB.addAndGet(bsr.sizeEstimate);
+ bsr.sizeEstimate = 0;
+ }
+ bsr.clear();
+ }
+
public long getBatchesAdded() {
return batchAdded.get();
}
@@ -389,6 +419,10 @@
public int getMaxProcessingKB() {
return maxProcessingKB;
}
+
+ public int getReserveBatchKB() {
+ return reserveBatchKB.get();
+ }
/**
* Get processor batch size
@@ -551,49 +585,88 @@
if (count < 1) {
return;
}
+ reservedByThread.set(reservedByThread.get() - count);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
}
+ if (lock.tryLock()) {
+ try {
+ this.reserveBatchKB.addAndGet(count);
+ batchesFreed.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ this.reserveBatchKB.addAndGet(count);
+ }
+ }
+
+ /**
+ * TODO: should consider other reservations by the current thread
+ */
+ @Override
+ public int reserveAdditionalBuffers(int additional) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", additional, "WAIT"); //$NON-NLS-1$ //$NON-NLS-2$
+ }
lock.lock();
try {
- this.reserveBatchKB.addAndGet(count);
- batchesFreed.signalAll();
+ //don't wait for more than is available
+ int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
+ int committed = 0;
+ while (waitCount > 0 && waitCount > this.reserveBatchKB.get() && committed < additional) {
+ int reserveBatchSample = this.reserveBatchKB.get();
+ try {
+ batchesFreed.await(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ if (reserveBatchSample >= this.reserveBatchKB.get()) {
+ waitCount >>= 3;
+ } else {
+ waitCount >>= 1;
+ }
+ int result = noWaitReserve(additional - committed);
+ committed += result;
+ }
+ return committed;
} finally {
lock.unlock();
+ persistBatchReferences();
}
- }
+ }
@Override
public int reserveBuffers(int count, BufferReserveMode mode) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
}
- lock.lock();
- try {
- if (mode == BufferReserveMode.WAIT) {
- //don't wait for more than is available
- int waitCount = Math.min(count, this.getMaxReserveKB());
- while (waitCount > 0 && waitCount > this.reserveBatchKB.get()) {
- try {
- batchesFreed.await(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new TeiidRuntimeException(e);
- }
- waitCount /= 2;
- }
- }
- if (this.reserveBatchKB.get() >= count || mode == BufferReserveMode.FORCE) {
- this.reserveBatchKB.addAndGet(-count);
- return count;
- }
- int result = Math.max(0, this.reserveBatchKB.get());
- this.reserveBatchKB.addAndGet(-result);
- return result;
- } finally {
- lock.unlock();
- persistBatchReferences();
+ int result = count;
+ if (mode == BufferReserveMode.FORCE) {
+ this.reserveBatchKB.addAndGet(-count);
+ } else {
+ result = noWaitReserve(count);
}
+ reservedByThread.set(reservedByThread.get() + result);
+ persistBatchReferences();
+ return result;
}
+
+ private int noWaitReserve(int count) {
+ for (int i = 0; i < 2; i++) {
+ int reserveBatch = this.reserveBatchKB.get();
+ count = Math.min(count, Math.max(0, reserveBatch));
+ if (count == 0) {
+ return 0;
+ }
+ if (this.reserveBatchKB.compareAndSet(reserveBatch, reserveBatch - count)) {
+ return count;
+ }
+ }
+ //the value is changing rapidly, but we've already potentially adjusted the value twice, so just proceed
+ this.reserveBatchKB.addAndGet(-count);
+ return count;
+ }
void persistBatchReferences() {
int activeBatch = activeBatchKB.get();
@@ -706,8 +779,7 @@
if (bsr != null) {
ce = bsr.get();
if (ce != null) {
- maxReserveKB.addAndGet(bsr.sizeEstimate);
- reserveBatchKB.addAndGet(bsr.sizeEstimate);
+ clearSoftReference(bsr);
}
}
} else if (useWeakReferences) {
@@ -776,9 +848,7 @@
break;
}
softCache.remove(ref.key);
- maxReserveKB.addAndGet(ref.sizeEstimate);
- reserveBatchKB.addAndGet(ref.sizeEstimate);
- ref.clear();
+ clearSoftReference(ref);
}
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -234,10 +234,10 @@
if (workingTuples.size() >= maxRows) {
int reserved = bufferManager.reserveBuffers(schemaSize,
(totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingKB())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
+ totalReservedBuffers += reserved;
if (reserved != schemaSize) {
break;
}
- totalReservedBuffers += reserved;
maxRows += this.batchSize;
}
try {
@@ -301,7 +301,7 @@
int reserved = Math.min(desiredSpace, this.bufferManager.getMaxProcessingKB());
bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
if (desiredSpace > reserved) {
- reserved += bufferManager.reserveBuffers(desiredSpace - reserved, BufferReserveMode.WAIT);
+ reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);
}
int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow progress
//release any partial excess
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -131,7 +131,7 @@
private CacheEntry get(BufferFrontedFileStoreCache cache, Long oid,
Serializer<Integer> s) throws TeiidComponentException {
PhysicalInfo o = cache.lockForLoad(oid, s);
- CacheEntry ce = cache.get(o, oid, s);
+ CacheEntry ce = cache.get(o, oid, new WeakReference<Serializer<?>>(s));
cache.unlockForLoad(o);
return ce;
}
Modified: trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java 2011-10-17 18:49:11 UTC (rev 3554)
+++ trunk/engine/src/test/java/org/teiid/query/processor/relational/TestSortNode.java 2011-10-18 03:37:06 UTC (rev 3555)
@@ -37,6 +37,7 @@
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
@@ -52,7 +53,8 @@
public static final int BATCH_SIZE = 100;
private void helpTestSort(List elements, List[] data, List sortElements, List sortTypes, List[] expected, Mode mode) throws TeiidComponentException, TeiidProcessingException {
- BufferManager mgr = BufferManagerFactory.getTestBufferManager(100, BATCH_SIZE, BATCH_SIZE);
+ BufferManagerImpl mgr = BufferManagerFactory.getTestBufferManager(10000, BATCH_SIZE, BATCH_SIZE);
+ int reserve = mgr.getReserveBatchKB();
CommandContext context = new CommandContext ("pid", "test", null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
BlockingFakeRelationalNode dataNode = new BlockingFakeRelationalNode(2, data);
@@ -87,6 +89,7 @@
}
}
assertEquals(expected.length, currentRow - 1);
+ assertEquals(reserve, mgr.getReserveBatchKB());
}
/*
More information about the teiid-commits
mailing list