Author: shawkins
Date: 2011-10-22 07:40:30 -0400 (Sat, 22 Oct 2011)
New Revision: 3576
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1750 further refining serialization and adding defrag
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public abstract class ExtensibleBufferedInputStream extends InputStream {
+ ByteBuffer buf;
+
+ @Override
+ public int read() throws IOException {
+ if (!ensureBytes()) {
+ return -1;
+ }
+ return buf.get() & 0xff;
+ }
+
+ private boolean ensureBytes() throws IOException {
+ if (buf == null || buf.remaining() == 0) {
+ buf = nextBuffer();
+ if (buf == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected abstract ByteBuffer nextBuffer() throws IOException;
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!ensureBytes()) {
+ return -1;
+ }
+ len = Math.min(len, buf.remaining());
+ buf.get(b, off, len);
+ return len;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (buf != null) {
+ buf.rewind();
+ }
+ }
+
+}
\ No newline at end of file
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-10-21 20:18:54
UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-10-22 11:40:30
UTC (rev 3576)
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -189,36 +190,31 @@
protected abstract void removeDirect();
public InputStream createInputStream(final long start, final long length) {
- return new InputStream() {
+ return new ExtensibleBufferedInputStream() {
private long offset = start;
private long streamLength = length;
+ private ByteBuffer bb = ByteBuffer.allocate(1<<13);
@Override
- public int read() throws IOException {
- byte[] buffer = new byte[1];
- int read = read(buffer, 0, 1);
- if (read == -1) {
- return -1;
- }
- return buffer[0];
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
+ protected ByteBuffer nextBuffer() throws IOException {
+ int len = bb.capacity();
if (this.streamLength != -1 && len > this.streamLength) {
len = (int)this.streamLength;
}
if (this.streamLength == -1 || this.streamLength > 0) {
- int bytes = FileStore.this.read(offset, b, off, len);
- if (bytes != -1) {
- this.offset += bytes;
- if (this.streamLength != -1) {
- this.streamLength -= bytes;
- }
+ int bytes = FileStore.this.read(offset, bb.array(), 0, len);
+ if (bytes == -1) {
+ return null;
}
- return bytes;
+ bb.rewind();
+ bb.limit(bytes);
+ this.offset += bytes;
+ if (this.streamLength != -1) {
+ this.streamLength -= bytes;
+ }
+ return bb;
}
- return -1;
+ return null;
}
};
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -22,7 +22,6 @@
package org.teiid.common.buffer;
-import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -50,8 +49,7 @@
if (fsos != null && !fsos.bytesWritten()) {
return new ByteArrayInputStream(fsos.getBuffer(), 0, fsos.getCount());
}
- //TODO: adjust the buffer size, and/or develop a shared buffer strategy
- return new BufferedInputStream(lobBuffer.createInputStream(0));
+ return lobBuffer.createInputStream(0);
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java 2011-10-21 20:18:54
UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java 2011-10-22 11:40:30
UTC (rev 3576)
@@ -23,16 +23,16 @@
package org.teiid.common.buffer;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
/**
* Responsible for serializing {@link CacheEntry}s
* @param <T>
*/
public interface Serializer<T> {
- void serialize(T obj, ObjectOutputStream oos) throws IOException;
- T deserialize(ObjectInputStream ois) throws IOException, ClassNotFoundException;
+ void serialize(T obj, ObjectOutput oos) throws IOException;
+ T deserialize(ObjectInput ois) throws IOException, ClassNotFoundException;
boolean useSoftCache();
Long getId();
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -22,18 +22,17 @@
package org.teiid.common.buffer.impl;
-import java.io.InputStream;
import java.nio.ByteBuffer;
+import org.teiid.common.buffer.ExtensibleBufferedInputStream;
+
/**
* TODO: support freeing of datablocks as we go
*/
-final class BlockInputStream extends InputStream {
+final class BlockInputStream extends ExtensibleBufferedInputStream {
private final BlockManager manager;
private final int maxBlock;
int blockIndex;
- ByteBuffer buf;
- boolean done;
BlockInputStream(BlockManager manager, int blockCount) {
this.manager = manager;
@@ -41,33 +40,11 @@
}
@Override
- public int read() {
- ensureBytes();
- if (done) {
- return -1;
+ protected ByteBuffer nextBuffer() {
+ if (maxBlock == blockIndex) {
+ return null;
}
- return buf.get() & 0xff;
+ return manager.getBlock(blockIndex++);
}
-
- private void ensureBytes() {
- if (buf == null || buf.remaining() == 0) {
- if (maxBlock == blockIndex) {
- done = true;
- return;
- }
- buf = manager.getBlock(blockIndex++);
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) {
- ensureBytes();
- if (done) {
- return -1;
- }
- len = Math.min(len, buf.remaining());
- buf.get(b, off, len);
- return len;
- }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-21
20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -22,8 +22,16 @@
package org.teiid.common.buffer.impl;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
/**
* Represents a FileStore that holds blocks of a fixed size.
@@ -32,6 +40,7 @@
final long blockSize;
final ConcurrentBitSet blocksInUse;
final FileStore[] stores;
+ final ReentrantReadWriteLock[] locks;
public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog, int
concurrencyLevel) {
this.blockSize = blockSize;
@@ -39,9 +48,50 @@
this.blocksInUse = new ConcurrentBitSet(blockCount, concurrencyLevel);
this.blocksInUse.setCompact(true);
this.stores = new FileStore[concurrencyLevel];
+ this.locks = new ReentrantReadWriteLock[concurrencyLevel];
for (int i = 0; i < stores.length; i++) {
- this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) +
'_' + i);
+ this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) +
'_' + i);
+ this.locks[i] = new ReentrantReadWriteLock();
}
+
}
+
+ int getAndSetNextClearBit(PhysicalInfo info) {
+ int result = blocksInUse.getAndSetNextClearBit();
+ if (result == -1) {
+ throw new TeiidRuntimeException("Out of blocks of size " + blockSize);
//$NON-NLS-1$
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL))
{
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data
block", result, "of size", blockSize, "to", info); //$NON-NLS-1$
//$NON-NLS-2$ //$NON-NLS-3$
+ }
+ return result;
+ }
+
+ int writeToStorageBlock(PhysicalInfo info,
+ InputStream is) throws IOException {
+ int block = getAndSetNextClearBit(info);
+ int segment = block/blocksInUse.getBitsPerSegment();
+ boolean success = false;
+ //we're using the read lock here so that defrag can lock the write out
+ locks[segment].readLock().lock();
+ try {
+ FileStore fs = stores[segment];
+ long blockOffset = (block%blocksInUse.getBitsPerSegment())*blockSize;
+ byte[] b = new byte[BufferFrontedFileStoreCache.BLOCK_SIZE];
+ int read = 0;
+ while ((read = is.read(b, 0, b.length)) != -1) {
+ fs.write(blockOffset, b, 0, read);
+ blockOffset+=read;
+ }
+ success = true;
+ } finally {
+ locks[segment].readLock().unlock();
+ if (!success) {
+ blocksInUse.clear(block);
+ block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ }
+ }
+ return block;
+ }
}
\ No newline at end of file
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -22,10 +22,11 @@
package org.teiid.common.buffer.impl;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -89,17 +90,14 @@
* The root directory "physicalMapping" is held in memory for performance. It
will grow in
* proportion to the number of tables/tuplebuffers in use.
*
- * TODO: compact tail storage blocks. there may be dangling blocks causing us to consume
disk space.
- * we should at least reclaim tail space if the end block is removed. for now we are
just relying
- * on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of
the
- * files.
- *
* The locking is as fine grained as possible to prevent contention. See {@link
PhysicalInfo} for
* flags that are used when it is used as a lock. It is important to not access the
* group maps when a {@link PhysicalInfo} lock is held.
*/
public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>,
StorageManager {
+ private static final int DEFAULT_MIN_DEFRAG = 1 << 23;
+ private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
private static final int EVICTION_SCANS = 5;
public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
@@ -355,9 +353,85 @@
//root directory
private ConcurrentHashMap<Long, Map<Long, PhysicalInfo>> physicalMapping =
new ConcurrentHashMap<Long, Map<Long, PhysicalInfo>>(16, .75f,
BufferManagerImpl.CONCURRENCY_LEVEL);
private BlockStore[] sizeBasedStores;
-
+
+ private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(2, "FileStore
Worker"); //$NON-NLS-1$
+ private AtomicBoolean defragRunning = new AtomicBoolean();
+ //defrag to release freespace held by storage files
+ private final Runnable defragTask = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Running defrag");
//$NON-NLS-1$
+ }
+ for (int i = 0; i < sizeBasedStores.length; i++) {
+ BlockStore blockStore = sizeBasedStores[i];
+ for (int segment = 0; segment < blockStore.stores.length; segment++) {
+ if (!shouldDefrag(blockStore, segment)) {
+ continue;
+ }
+ try {
+ do {
+ int blockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
+ if (!shouldDefrag(blockStore, segment)) {
+ //truncate the file
+ blockStore.locks[segment].writeLock().lock();
+ try {
+ int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
+ long length = endBlock * blockStore.blockSize;
+ blockStore.stores[segment].setLength(length);
+ } finally {
+ blockStore.locks[segment].writeLock().unlock();
+ }
+ break;
+ }
+ //move the block if possible
+ InputStream is = blockStore.stores[segment].createInputStream(blockToMove *
blockStore.blockSize);
+ DataInputStream dis = new DataInputStream(is);
+ Long gid = dis.readLong();
+ Long oid = dis.readLong();
+ dis.reset(); //move back to the beginning
+ Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
+ if (map == null) {
+ continue;
+ }
+ PhysicalInfo info = map.get(oid);
+ if (info == null) {
+ continue;
+ }
+ synchronized (info) {
+ await(info, true, false);
+ if (info.block == EMPTY_ADDRESS) {
+ continue;
+ }
+ assert info.block == blockToMove;
+ }
+ int newBlock = blockStore.writeToStorageBlock(info, dis);
+ synchronized (info) {
+ await(info, true, true);
+ if (info.block == EMPTY_ADDRESS) {
+ //already removed;
+ if (newBlock != EMPTY_ADDRESS) {
+ blockStore.blocksInUse.clear(newBlock);
+ }
+ continue;
+ }
+ info.block = newBlock;
+ blockStore.blocksInUse.clear(blockToMove);
+ }
+ } while (shouldDefrag(blockStore, segment));
+ } catch (IOException e) {
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing
defrag"); //$NON-NLS-1$
+ }
+ }
+ }
+ } finally {
+ defragRunning.set(false);
+ }
+ }
+ };
private AtomicBoolean cleanerRunning = new AtomicBoolean();
- private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, "FileStore
Worker"); //$NON-NLS-1$
private final Runnable cleaningTask = new Runnable() {
@Override
@@ -379,6 +453,8 @@
private AtomicLong storageWrites = new AtomicLong();
private AtomicLong storageReads = new AtomicLong();
+ private long minDefrag = DEFAULT_MIN_DEFRAG;
+
@Override
public void initialize() throws TeiidComponentException {
storageManager.initialize();
@@ -482,10 +558,12 @@
hasPermit = true;
blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
BlockOutputStream bos = new BlockOutputStream(blockManager);
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeInt(entry.getSizeEstimate());
- s.serialize(entry.getObject(), oos);
- oos.close();
+ ObjectOutput dos = new DataObjectOutputStream(bos);
+ dos.writeLong(s.getId());
+ dos.writeLong(entry.getId());
+ dos.writeInt(entry.getSizeEstimate());
+ s.serialize(entry.getObject(), dos);
+ dos.close();
//synchronized to ensure proper cleanup from a concurrent removal
synchronized (map) {
if (physicalMapping.containsKey(s.getId()) &&
map.containsKey(entry.getId())) {
@@ -585,6 +663,7 @@
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
is = new BlockInputStream(manager, info.memoryBlockCount);
} else if (info.block != EMPTY_ADDRESS) {
+ info.pinned = true;
memoryBufferEntries.recordAccess(info);
storageReads.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
@@ -598,8 +677,10 @@
return null;
}
}
- ObjectInputStream ois = new ObjectInputStream(is);
- CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), ois.readInt(),
serializer.deserialize(ois), ref, true);
+ ObjectInput dis = new DataObjectInputStream(is);
+ dis.readFully(HEADER_SKIP_BUFFER);
+ int sizeEstimate = dis.readInt();
+ CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), sizeEstimate,
serializer.deserialize(dis), ref, true);
return ce;
} catch(IOException e) {
throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
@@ -718,26 +799,10 @@
storageWrites.getAndIncrement();
BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
BlockStore blockStore = sizeBasedStores[sizeIndex];
- block = getAndSetNextClearBit(blockStore);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data
block", info.block, "of size", blockStore.blockSize, "to", info);
//$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- }
- FileStore fs = blockStore.stores[block/blockStore.blocksInUse.getBitsPerSegment()];
- long blockOffset =
(block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
- byte[] b = new byte[BLOCK_SIZE];
- int read = 0;
- try {
- while ((read = is.read(b, 0, b.length)) != -1) {
- fs.write(blockOffset, b, 0, read);
- blockOffset+=read;
- }
- } catch (Throwable e) {
- //shouldn't happen, but we'll invalidate this write and continue
- demote = false;
- //just continue to free
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block
to storage " + oid); //$NON-NLS-1$
- }
+ block = blockStore.writeToStorageBlock(info, is);
}
+ } catch (IOException e) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to
storage " + oid); //$NON-NLS-1$
} finally {
//ensure post conditions
synchronized (info) {
@@ -764,6 +829,12 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Freed storage data
block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$
//$NON-NLS-2$
}
+ int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
+ if (!defragRunning.get()
+ && shouldDefrag(blockStore, segment)
+ && defragRunning.compareAndSet(false, true)) {
+ this.asynchPool.execute(defragTask);
+ }
info.block = EMPTY_ADDRESS;
}
}
@@ -775,6 +846,14 @@
return result;
}
+ boolean shouldDefrag(BlockStore blockStore, int segment) {
+ int highestBitSet = blockStore.blocksInUse.getHighestBitSet(segment);
+ int bitsSet = blockStore.blocksInUse.getBitsSet(segment);
+ highestBitSet = Math.max(bitsSet, Math.max(1, highestBitSet));
+ int freeBlocks = highestBitSet-bitsSet;
+ return freeBlocks > (highestBitSet>>2) &&
freeBlocks*blockStore.blockSize > minDefrag;
+ }
+
private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
while ((pinned && info.pinned) || (evicting && info.evicting)) {
try {
@@ -785,14 +864,6 @@
}
}
- static int getAndSetNextClearBit(BlockStore bs) {
- int result = bs.blocksInUse.getAndSetNextClearBit();
- if (result == -1) {
- throw new TeiidRuntimeException("Out of blocks of size " + bs.blockSize);
//$NON-NLS-1$
- }
- return result;
- }
-
/**
* Eviction routine. When space is exhausted data blocks are acquired from
* memory entries.
@@ -886,6 +957,10 @@
public long getMemoryBufferSpace() {
return memoryBufferSpace;
}
+
+ public void setMinDefrag(long minDefrag) {
+ this.minDefrag = minDefrag;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -23,8 +23,8 @@
package org.teiid.common.buffer.impl;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
@@ -81,6 +81,8 @@
*
* TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent
batches
* - this is not necessary for already persistent batches, since we hold a weak
reference
+ *
+ * TODO: add a pre-fetch for tuplebuffers or some built-in correlation logic with the
queue.
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
@@ -214,7 +216,7 @@
}
@Override
- public List<? extends List<?>> deserialize(ObjectInputStream ois)
+ public List<? extends List<?>> deserialize(ObjectInput ois)
throws IOException, ClassNotFoundException {
List<? extends List<?>> batch = BatchSerializer.readBatch(ois, types);
if (lobManager != null) {
@@ -231,7 +233,7 @@
@Override
public void serialize(List<? extends List<?>> obj,
- ObjectOutputStream oos) throws IOException {
+ ObjectOutput oos) throws IOException {
int expectedModCount = 0;
ResizingArrayList<?> list = null;
if (obj instanceof ResizingArrayList<?>) {
@@ -626,7 +628,7 @@
} else {
waitCount >>= 1;
}
- int result = noWaitReserve(additional - committed);
+ int result = noWaitReserve(additional - committed, false);
committed += result;
}
return committed;
@@ -645,16 +647,19 @@
if (mode == BufferReserveMode.FORCE) {
this.reserveBatchKB.addAndGet(-count);
} else {
- result = noWaitReserve(count);
+ result = noWaitReserve(count, true);
}
reservedByThread.set(reservedByThread.get() + result);
persistBatchReferences();
return result;
}
- private int noWaitReserve(int count) {
+ private int noWaitReserve(int count, boolean allOrNothing) {
for (int i = 0; i < 2; i++) {
int reserveBatch = this.reserveBatchKB.get();
+ if (allOrNothing && count > reserveBatch) {
+ return 0;
+ }
count = Math.min(count, Math.max(0, reserveBatch));
if (count == 0) {
return 0;
@@ -828,9 +833,12 @@
void addMemoryEntry(CacheEntry ce, boolean initial) {
persistBatchReferences();
synchronized (ce) {
- memoryEntries.put(ce.getId(), ce);
+ boolean added = memoryEntries.put(ce.getId(), ce) == null;
if (initial) {
evictionQueue.add(ce);
+ } else if (added) {
+ evictionQueue.recordAccess(ce);
+ evictionQueue.add(ce);
} else {
evictionQueue.touch(ce);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -33,6 +33,7 @@
*/
public class ConcurrentBitSet {
+ private static final int CONCURRENT_MODIFICATION = -2;
private static final int ADDRESS_BITS_PER_TOP_VALUE = 18;
private static final int MAX_TOP_VALUE = 1 << ADDRESS_BITS_PER_TOP_VALUE;
@@ -40,6 +41,7 @@
int offset;
int maxBits;
int startSearch;
+ int highestBitSet = -1;
int bitsSet;
int[] topVals;
final BitSet bitSet;
@@ -58,6 +60,10 @@
private Segment[] segments;
private boolean compact;
+ /**
+ * @param maxBits
+ * @param concurrencyLevel - should be a power of 2
+ */
public ConcurrentBitSet(int maxBits, int concurrencyLevel) {
Assertion.assertTrue(maxBits > 0);
while ((bitsPerSegment = maxBits/concurrencyLevel) < concurrencyLevel) {
@@ -108,10 +114,34 @@
return counter.getAndIncrement();
}
- public int getAndSetNextClearBit(int start) {
+ /**
+ * return an estimate of the number of bits set
+ * @param segment
+ * @return
+ */
+ public int getBitsSet(int segment) {
+ Segment s = segments[segment&(segments.length-1)];
+ return s.bitsSet;
+ }
+
+ /**
+ * return an estimate of the highest bit (relative index) that has been set
+ * @param segment
+ * @return
+ */
+ public int getHighestBitSet(int segment) {
+ Segment s = segments[segment&(segments.length-1)];
+ return s.highestBitSet;
+ }
+
+ /**
+ * @param segment
+ * @return the next clear bit index as an absolute index - not relative to a segment
+ */
+ public int getAndSetNextClearBit(int segment) {
int nextBit = -1;
for (int i = 0; i < segments.length; i++) {
- Segment s = segments[(start+i)&(segments.length-1)];
+ Segment s = segments[(segment+i)&(segments.length-1)];
synchronized (s) {
if (s.bitsSet == s.maxBits) {
continue;
@@ -122,7 +152,7 @@
continue;
}
if (s.topVals[j] == 0) {
- if (j == start) {
+ if (j == segment) {
nextBit = s.startSearch;
break;
}
@@ -148,6 +178,7 @@
s.bitsSet++;
s.bitSet.set(nextBit);
s.startSearch = nextBit + 1;
+ s.highestBitSet = Math.max(s.highestBitSet, nextBit);
if (s.startSearch == s.maxBits) {
s.startSearch = 0;
}
@@ -187,4 +218,84 @@
this.compact = compact;
}
+
+ public int compactHighestBitSet(int segment) {
+ Segment s = segments[segment&(segments.length-1)];
+ //first do an unlocked compact
+ for (int i = 0; i < 3; i++) {
+ int result = tryCompactHighestBitSet(s);
+ if (result != CONCURRENT_MODIFICATION) {
+ return result;
+ }
+ }
+ synchronized (s) {
+ return tryCompactHighestBitSet(s);
+ }
+ }
+
+ private int tryCompactHighestBitSet(Segment s) {
+ int highestBitSet = 0;
+ synchronized (s) {
+ highestBitSet = s.highestBitSet;
+ if (highestBitSet <= 0) {
+ return 0;
+ }
+ if (s.bitSet.get(highestBitSet)) {
+ return highestBitSet;
+ }
+ }
+ int indexSearchStart = highestBitSet >> ADDRESS_BITS_PER_TOP_VALUE;
+ for (int j = indexSearchStart; j >= 0; j--) {
+ if (s.topVals[j] == 0) {
+ if (j==0) {
+ synchronized (s) {
+ if (s.highestBitSet != highestBitSet) {
+ return CONCURRENT_MODIFICATION;
+ }
+ s.highestBitSet = -1;
+ }
+ }
+ continue;
+ }
+ if (s.topVals[j] == MAX_TOP_VALUE) {
+ synchronized (s) {
+ if (s.highestBitSet != highestBitSet) {
+ return CONCURRENT_MODIFICATION;
+ }
+ s.highestBitSet = ((j + 1) * MAX_TOP_VALUE) -1;
+ }
+ break;
+ }
+ int index = j * MAX_TOP_VALUE;
+ int end = index + s.maxBits;
+ if (j == indexSearchStart) {
+ end = highestBitSet;
+ }
+ BitSet bs = s.bitSet;
+ int offset = 0;
+ if (j == indexSearchStart) {
+ bs = s.bitSet.get(index, end); //ensures that we look only at a subset of the words
+ offset = index;
+ }
+ index = index - offset;
+ end = end - offset - 1;
+ while (index < end) {
+ int next = bs.nextSetBit(index);
+ if (next == -1) {
+ index--;
+ break;
+ }
+ index = next + 1;
+ }
+ synchronized (s) {
+ if (s.highestBitSet != highestBitSet) {
+ return CONCURRENT_MODIFICATION;
+ }
+ s.highestBitSet = index + offset;
+ return s.highestBitSet;
+ }
+ }
+ return -1;
+ }
+
}
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -0,0 +1,105 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.io.StreamCorruptedException;
+
+public class DataObjectInputStream extends DataInputStream implements ObjectInput {
+
+ ObjectInput ois;
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+
+ public DataObjectInputStream(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public Object readObject() throws ClassNotFoundException, IOException {
+ if (ois == null) {
+ ois = new ObjectInputStream(this) {
+
+ @Override
+ protected void readStreamHeader() throws IOException,
+ StreamCorruptedException {
+ int version = readByte() & 0xFF;
+ if (version != STREAM_VERSION) {
+ throw new StreamCorruptedException("Unsupported version: " +
version); //$NON-NLS-1$
+ }
+ }
+
+ @Override
+ protected ObjectStreamClass readClassDescriptor()
+ throws IOException, ClassNotFoundException {
+ int type = read();
+ if (type < 0) {
+ throw new EOFException();
+ }
+ switch (type) {
+ case DataObjectOutputStream.TYPE_FAT_DESCRIPTOR:
+ return super.readClassDescriptor();
+ case DataObjectOutputStream.TYPE_THIN_DESCRIPTOR:
+ String className = readUTF();
+ Class<?> clazz = loadClass(className);
+ return ObjectStreamClass.lookup(clazz);
+ default:
+ className = DataObjectOutputStream.typeMapping.get((byte)type);
+ if (className == null) {
+ throw new StreamCorruptedException("Unknown class type " + type);
//$NON-NLS-1$
+ }
+ clazz = loadClass(className);
+ return ObjectStreamClass.lookup(clazz);
+ }
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
+ String className = desc.getName();
+ try {
+ return loadClass(className);
+ } catch (ClassNotFoundException ex) {
+ return super.resolveClass(desc);
+ }
+ }
+
+ protected Class<?> loadClass(String className) throws ClassNotFoundException
{
+ Class<?> clazz;
+ if (classLoader != null) {
+ clazz = classLoader.loadClass(className);
+ } else {
+ clazz = Class.forName(className);
+ }
+ return clazz;
+ }
+ };
+ }
+ return ois.readObject();
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Extends the logic of Netty's CompactObjectOutputStream to use byte identifiers
+ * for some classes and to write/flush objects directly into the output.
+ *
+ * We can do this since buffer serialized data is ephemeral and good only for
+ * a single process.
+ */
+public class DataObjectOutputStream extends DataOutputStream implements ObjectOutput {
+
+ private static final int MAX_BYTE_IDS = 254;
+ static AtomicInteger counter = new AtomicInteger(2);
+ static final ConcurrentHashMap<String, Byte> knownClasses = new
ConcurrentHashMap<String, Byte>();
+ static final ConcurrentHashMap<Byte, String> typeMapping = new
ConcurrentHashMap<Byte, String>();
+
+ static final int TYPE_FAT_DESCRIPTOR = 0;
+ static final int TYPE_THIN_DESCRIPTOR = 1;
+
+ ObjectOutputStream oos;
+
+ public DataObjectOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void writeObject(Object obj) throws IOException {
+ if (oos == null) {
+ oos = new ObjectOutputStream(this) {
+ @Override
+ protected void writeStreamHeader() throws IOException {
+ writeByte(STREAM_VERSION);
+ }
+
+ @Override
+ protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
+ Class<?> clazz = desc.forClass();
+ if (clazz.isPrimitive() || clazz.isArray()) {
+ write(TYPE_FAT_DESCRIPTOR);
+ super.writeClassDescriptor(desc);
+ } else {
+ String name = desc.getName();
+ Byte b = knownClasses.get(name);
+ if (b == null && counter.get() < MAX_BYTE_IDS) {
+ synchronized (DataObjectOutputStream.class) {
+ b = knownClasses.get(name);
+ if (b == null && counter.get() < 254) {
+ b = (byte)counter.getAndIncrement();
+ knownClasses.put(name, b);
+ typeMapping.put(b, name);
+ }
+ }
+ }
+ if (b != null) {
+ write(b);
+ } else {
+ write(TYPE_THIN_DESCRIPTOR);
+ writeUTF(name);
+ }
+ }
+ }
+ };
+ }
+ oos.writeObject(obj);
+ oos.flush();
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -37,17 +37,18 @@
public void write(int b) throws IOException {
ensureBuffer();
- if (buf.remaining() == 0) {
- flush();
- }
buf.put((byte)b);
}
- private void ensureBuffer() {
- if (buf == null) {
- buf = newBuffer();
- startPosition = buf.position();
- }
+ private void ensureBuffer() throws IOException {
+ if (buf != null) {
+ if (buf.remaining() != 0) {
+ return;
+ }
+ flush();
+ }
+ buf = newBuffer();
+ startPosition = buf.position();
}
public void write(byte b[], int off, int len) throws IOException {
@@ -60,7 +61,6 @@
if (buf.remaining() > 0) {
break;
}
- flush();
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -34,7 +34,6 @@
/**
* A Concurrent LRFU eviction queue. Has assumptions that match buffermanager usage.
* Null values are not allowed.
- * @param <K>
* @param <V>
*/
public class LrfuEvictionQueue<V extends BaseCacheEntry> {
@@ -49,11 +48,13 @@
//TODO: adaptively adjust this value. more hits should move closer to lru
protected double crfLamda;
protected double inverseCrfLamda = 1 - crfLamda;
- protected int maxInterval;
+ protected int maxInterval; //don't consider the old ordering value after the
maxInterval
+ protected int minInterval; //cap the frequency gain under this interval (we can make
some values too hot otherwise)
+ private float minVal;
public LrfuEvictionQueue(AtomicLong clock) {
this.clock = clock;
- setCrfLamda(.0002);
+ setCrfLamda(.00005); //smaller values tend to work better since we're using
interval bounds
}
public boolean remove(V value) {
@@ -110,7 +111,7 @@
long delta = currentTime - longLastAccess;
orderingValue =
(float) (//Frequency component
- (delta>maxInterval?0:orderingValue*Math.pow(inverseCrfLamda, delta))
+ (delta<maxInterval?(delta<minInterval?minVal:Math.pow(inverseCrfLamda,
delta)):0)*orderingValue
//recency component
+ Math.pow(currentTime, crfLamda));
return orderingValue;
@@ -125,9 +126,14 @@
this.inverseCrfLamda = 1 - crfLamda;
int i = 0;
for (; i < 30; i++) {
- if ((float)Math.pow(inverseCrfLamda, 1<<i) == 0) {
+ float val = (float)Math.pow(inverseCrfLamda, 1<<i);
+ if (val == 0) {
break;
}
+ if (val > .8) {
+ minInterval = 1<<i;
+ this.minVal = val;
+ }
}
this.maxInterval = 1<<(i-1);
}
Modified:
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -298,7 +298,7 @@
TupleBuffer merged = createTupleBuffer();
int desiredSpace = activeTupleBuffers.size() * schemaSize;
- int reserved = Math.min(desiredSpace,
this.bufferManager.getMaxProcessingKB());
+ int reserved = Math.min(desiredSpace, Math.max(2*schemaSize,
this.bufferManager.getMaxProcessingKB()));
bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
if (desiredSpace > reserved) {
reserved += bufferManager.reserveAdditionalBuffers(desiredSpace -
reserved);
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -25,8 +25,8 @@
import static org.junit.Assert.*;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.lang.ref.WeakReference;
import org.junit.Test;
@@ -38,7 +38,7 @@
private final class SimpleSerializer implements Serializer<Integer> {
@Override
- public Integer deserialize(ObjectInputStream ois)
+ public Integer deserialize(ObjectInput ois)
throws IOException, ClassNotFoundException {
Integer result = ois.readInt();
for (int i = 0; i < result; i++) {
@@ -53,7 +53,7 @@
}
@Override
- public void serialize(Integer obj, ObjectOutputStream oos)
+ public void serialize(Integer obj, ObjectOutput oos)
throws IOException {
oos.writeInt(obj);
for (int i = 0; i < obj; i++) {
Modified:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-21
20:18:54 UTC (rev 3575)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -81,4 +81,34 @@
assertEquals(50, bst.getAndSetNextClearBit());
}
+ @Test public void testCompactHighest() {
+ ConcurrentBitSet bst = new ConcurrentBitSet(1 << 19, 1);
+ bst.setCompact(true);
+ for (int i = 0; i < bst.getTotalBits(); i++) {
+ bst.getAndSetNextClearBit();
+ }
+ assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(0));
+ assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(1));
+
+ for (int i = bst.getTotalBits()-20; i < bst.getTotalBits(); i++) {
+ bst.clear(i);
+ }
+
+ assertEquals(bst.getTotalBits()-21, bst.compactHighestBitSet(0));
+
+ for (int i = bst.getTotalBits()-20; i < bst.getTotalBits(); i++) {
+ bst.getAndSetNextClearBit();
+ }
+
+ assertEquals(-1, bst.getAndSetNextClearBit());
+
+ for (int i = 20; i < bst.getTotalBits(); i++) {
+ bst.clear(i);
+ }
+
+ assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(0));
+ assertEquals(19, bst.compactHighestBitSet(0));
+
+ }
+
}
Modified: trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-10-21
20:18:54 UTC (rev 3575)
+++ trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-10-22
11:40:30 UTC (rev 3576)
@@ -113,19 +113,20 @@
while (true) {
try {
nextFuture = rs.submitNext();
- if (!nextFuture.isDone()) {
- nextFuture.addCompletionListener(new
ResultsFuture.CompletionListener<Boolean>() {
- @Override
- public void onCompletion(ResultsFuture<Boolean> future) {
- if (processRow(future)) {
- if (rowsSent != rows2Send) {
- //this can be recursive, but ideally won't be called many times
- ResultsWorkItem.this.run();
- }
- }
- }
- });
- return;
+ synchronized (nextFuture) {
+ if (!nextFuture.isDone()) {
+ nextFuture.addCompletionListener(new
ResultsFuture.CompletionListener<Boolean>() {
+ @Override
+ public void onCompletion(ResultsFuture<Boolean> future) {
+ if (processRow(future)) {
+ if (rowsSent != rows2Send) {
+ ResultsWorkItem.this.run();
+ }
+ }
+ }
+ });
+ return;
+ }
}
if (!processRow(nextFuture)) {
break;