teiid SVN: r4561 - in branches/7.7.x-TEIID-2429: engine/src/main/java/org/teiid/common/buffer and 9 other directories.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2013-04-17 14:59:47 -0400 (Wed, 17 Apr 2013)
New Revision: 4561
Added:
branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java
branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java
Modified:
branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java
branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
Log:
sort.patch applied (w/TestSystemVirtualModel.testColumns disabled)
Modified: branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
===================================================================
--- branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -80,11 +80,11 @@
*/
public class DataTypeManager {
- private static final boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", false); //$NON-NLS-1$
+ public static final boolean USE_VALUE_CACHE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.useValueCache", false); //$NON-NLS-1$
private static final boolean COMPARABLE_LOBS = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.comparableLobs", false); //$NON-NLS-1$
public static final boolean PAD_SPACE = PropertiesUtils.getBooleanProperty(System.getProperties(), "org.teiid.padSpace", false); //$NON-NLS-1$
- private static boolean valueCacheEnabled;
+ private static boolean valueCacheEnabled = USE_VALUE_CACHE;
private interface ValueCache<T> {
T getValue(T value);
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AbstractTupleSource.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -57,10 +57,11 @@
BlockedException, TeiidProcessingException {
if (available() > 0) {
//if (forwardOnly) {
- if (batch == null || !batch.containsRow(currentRow)) {
- batch = getBatch(currentRow);
+ int row = getCurrentIndex();
+ if (batch == null || !batch.containsRow(row)) {
+ batch = getBatch(row);
}
- return batch.getTuple(currentRow);
+ return batch.getTuple(row);
//}
//TODO: determine if we should directly hold a soft reference here
//return getRow(currentRow);
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/AutoCleanupUtil.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -55,10 +55,20 @@
private static ReferenceQueue<Object> QUEUE = new ReferenceQueue<Object>();
private static final Set<PhantomReference<Object>> REFERENCES = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<PhantomReference<Object>, Boolean>()));
- public static void setCleanupReference(Object o, Removable r) {
- REFERENCES.add(new PhantomCleanupReference(o, r));
+ public static PhantomReference<Object> setCleanupReference(Object o, Removable r) {
+ PhantomCleanupReference ref = new PhantomCleanupReference(o, r);
+ REFERENCES.add(ref);
doCleanup();
+ return ref;
}
+
+ public static void removeCleanupReference(PhantomReference<Object> ref) {
+ if (ref == null) {
+ return;
+ }
+ REFERENCES.remove(ref);
+ ref.clear();
+ }
public static void doCleanup() {
for (int i = 0; i < 10; i++) {
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -47,4 +47,6 @@
Reference<? extends BatchManager> getBatchManagerReference();
String[] getTypes();
+
+ int getRowSizeEstimate();
}
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BlockedException.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -37,6 +37,8 @@
public class BlockedException extends TeiidComponentException {
public static final BlockedException INSTANCE = new BlockedException();
+
+ public static final BlockedException BLOCKED_ON_MEMORY_EXCEPTION = new BlockedException();
/**
* No-arg costructor required by Externalizable semantics
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -134,13 +134,10 @@
*/
void setMaxActivePlans(int maxActivePlans);
- /**
- * Wait for additional buffers to become available.
- * @param additional
- * @return
- */
- int reserveAdditionalBuffers(int additional);
-
Streamable<?> persistLob(final Streamable<?> lob,
final FileStore store, byte[] bytes) throws TeiidComponentException;
+
+ int reserveBuffersBlocking(int count, long[] attempts, boolean force) throws BlockedException;
+
+ void releaseOrphanedBuffers(long count);
}
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -65,5 +65,12 @@
buf.rewind();
}
}
+
+ public ByteBuffer getBuffer() throws IOException {
+ if (!ensureBytes()) {
+ return null;
+ }
+ return buf;
+ }
}
\ No newline at end of file
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -191,7 +191,7 @@
protected abstract void removeDirect();
- public InputStream createInputStream(final long start, final long length) {
+ public ExtensibleBufferedInputStream createInputStream(final long start, final long length) {
return new ExtensibleBufferedInputStream() {
private long offset = start;
private long streamLength = length;
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -30,6 +30,7 @@
import org.teiid.client.ResizingArrayList;
import org.teiid.common.buffer.LobManager.ReferenceMode;
import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.Streamable;
import org.teiid.core.util.Assertion;
@@ -42,6 +43,62 @@
public class TupleBuffer {
+ public final class TupleBufferTupleSource extends
+ AbstractTupleSource {
+ private final boolean singleUse;
+ private boolean noBlocking;
+ private boolean reverse;
+
+ private TupleBufferTupleSource(boolean singleUse) {
+ this.singleUse = singleUse;
+ }
+
+ @Override
+ protected List<?> finalRow() throws TeiidComponentException, TeiidProcessingException {
+ if(isFinal || noBlocking || reverse) {
+ return null;
+ }
+ throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+
+ @Override
+ public int available() {
+ if (!reverse) {
+ return rowCount - getCurrentIndex() + 1;
+ }
+ return getCurrentIndex();
+ }
+
+ @Override
+ protected TupleBatch getBatch(int row) throws TeiidComponentException {
+ return TupleBuffer.this.getBatch(row);
+ }
+
+ @Override
+ public void closeSource() {
+ super.closeSource();
+ if (singleUse) {
+ remove();
+ }
+ }
+
+ public void setNoBlocking(boolean noBlocking) {
+ this.noBlocking = noBlocking;
+ }
+
+ public void setReverse(boolean reverse) {
+ this.reverse = reverse;
+ }
+
+ @Override
+ public int getCurrentIndex() {
+ if (!reverse) {
+ return super.getCurrentIndex();
+ }
+ return getRowCount() - super.getCurrentIndex() + 1;
+ }
+
+ }
/**
* Gets the data type names for each of the input expressions, in order.
* @param expressions List of Expressions
@@ -299,7 +356,7 @@
this.forwardOnly = forwardOnly;
}
- public IndexedTupleSource createIndexedTupleSource() {
+ public TupleBufferTupleSource createIndexedTupleSource() {
return createIndexedTupleSource(false);
}
@@ -307,38 +364,11 @@
* Create a new iterator for this buffer
* @return
*/
- public IndexedTupleSource createIndexedTupleSource(final boolean singleUse) {
+ public TupleBufferTupleSource createIndexedTupleSource(final boolean singleUse) {
if (singleUse) {
setForwardOnly(true);
}
- return new AbstractTupleSource() {
-
- @Override
- protected List<?> finalRow() throws BlockedException {
- if(isFinal) {
- return null;
- }
- throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
- }
-
- @Override
- public int available() {
- return rowCount - getCurrentIndex() + 1;
- }
-
- @Override
- protected TupleBatch getBatch(int row) throws TeiidComponentException {
- return TupleBuffer.this.getBatch(row);
- }
-
- @Override
- public void closeSource() {
- super.closeSource();
- if (singleUse) {
- remove();
- }
- }
- };
+ return new TupleBufferTupleSource(singleUse);
}
@Override
@@ -369,4 +399,8 @@
return this.lobManager.getLobCount();
}
+ public int getRowSizeEstimate() {
+ return this.manager.getRowSizeEstimate();
+ }
+
}
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -53,6 +53,7 @@
import org.teiid.common.buffer.Cache;
import org.teiid.common.buffer.CacheEntry;
import org.teiid.common.buffer.CacheKey;
+import org.teiid.common.buffer.ExtensibleBufferedInputStream;
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.Serializer;
import org.teiid.common.buffer.StorageManager;
@@ -687,6 +688,7 @@
}
InputStream is = null;
Lock lock = null;
+ ExtensibleBufferedInputStream eis = null;
int memoryBlocks = 0;
try {
synchronized (info) {
@@ -711,7 +713,7 @@
int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
FileStore fs = blockStore.stores[segment];
long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
- is = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
+ eis = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
lock = blockStore.locks[segment].writeLock();
memoryBlocks = info.memoryBlockCount;
} else {
@@ -719,7 +721,7 @@
}
}
if (lock != null) {
- is = readIntoMemory(info, is, lock, memoryBlocks);
+ is = readIntoMemory(info, eis, lock, memoryBlocks);
}
ObjectInput dis = new ObjectInputStream(is);
dis.readFully(HEADER_SKIP_BUFFER);
@@ -743,7 +745,7 @@
/**
* Transfer into memory to release memory/file locks
*/
- private InputStream readIntoMemory(PhysicalInfo info, InputStream is,
+ private InputStream readIntoMemory(PhysicalInfo info, ExtensibleBufferedInputStream is,
Lock fileLock, int memoryBlocks) throws InterruptedException,
IOException {
checkForLowMemory();
@@ -763,11 +765,14 @@
locked = true;
ExtensibleBufferedOutputStream os = new BlockOutputStream(manager, -1);
//TODO: there is still an extra buffer being created here, we could FileChannels to do better
- int b = -1;
- while ((b = is.read()) != -1) {
- os.write(b);
+ ByteBuffer bb = null;
+ while ((bb = is.getBuffer()) != null) {
+ byte[] array = bb.array();
+ os.write(array, bb.position() + bb.arrayOffset(), bb.remaining());
+ bb.position(bb.position()+bb.remaining());
}
fileLock.unlock();
+ os.close();
locked = false;
synchronized (info) {
info.inode = manager.getInode();
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
@@ -54,14 +55,15 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
import org.teiid.core.types.Streamable;
-import org.teiid.core.types.DataTypeManager.WeakReferenceHashedValueCache;
import org.teiid.dqp.internal.process.DQPConfiguration;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
import org.teiid.query.processor.relational.ListNestedSortComparator;
import org.teiid.query.sql.symbol.Expression;
+import org.teiid.query.util.CommandContext;
/**
@@ -76,12 +78,14 @@
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
+ private static final int SYSTEM_OVERHEAD_MEGS = 300;
+
/**
* Asynch cleaner attempts to age out old entries and to reduce the memory size when
* little is reserved.
*/
+ private static final int MAX_READ_AGE = 1<<18;
private static final class Cleaner extends TimerTask {
- private static final int MAX_READ_AGE = 1<<28;
WeakReference<BufferManagerImpl> bufferRef;
public Cleaner(BufferManagerImpl bufferManagerImpl) {
@@ -96,36 +100,45 @@
this.cancel();
return;
}
- boolean agingOut = false;
- if (impl.reserveBatchBytes.get() < impl.maxReserveBytes.get()*.9 || impl.activeBatchBytes.get() < impl.maxReserveBytes.get()*.7) {
- CacheEntry entry = impl.evictionQueue.firstEntry(false);
- if (entry == null) {
- return;
+ impl.cleaning.set(true);
+ try {
+ long evicted = impl.doEvictions(0, false, impl.initialEvictionQueue);
+ if (evicted != 0) {
+ continue;
}
- //we aren't holding too many memory entries, ensure that
- //entries aren't old
- long lastAccess = entry.getKey().getLastAccess();
- long currentTime = impl.readAttempts.get();
- if (currentTime - lastAccess < MAX_READ_AGE) {
- return;
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", evicted, impl.reserveBatchBytes.get(), impl.maxReserveBytes, impl.activeBatchBytes.get()); //$NON-NLS-1$
}
- agingOut = true;
+ } catch (Throwable t) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, t, "Exception during cleaning run"); //$NON-NLS-1$
}
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchBytes.get(), impl.maxReserveBytes.get(), impl.activeBatchBytes.get()); //$NON-NLS-1$
- }
- impl.doEvictions(0, !agingOut);
- if (!agingOut) {
+ synchronized (this) {
+ impl.cleaning.set(false);
try {
- Thread.sleep(100); //we don't want to evict too fast, because the processing threads are more than capable of evicting
+ this.wait(100);
} catch (InterruptedException e) {
- throw new TeiidRuntimeException(e);
+ break;
}
}
}
}
}
+
+ private final class Remover implements Removable {
+ private Long id;
+ private AtomicBoolean prefersMemory;
+
+ public Remover(Long id, AtomicBoolean prefersMemory) {
+ this.id = id;
+ this.prefersMemory = prefersMemory;
+ }
+ @Override
+ public void remove() {
+ removeCacheGroup(id, prefersMemory.get());
+ }
+ }
+
/**
* This estimate is based upon adding the value to 2/3 maps and having CacheEntry/PhysicalInfo keys
*/
@@ -135,9 +148,12 @@
final Long id;
SizeUtility sizeUtility;
private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
+ private PhantomReference<Object> cleanup;
AtomicBoolean prefersMemory = new AtomicBoolean();
String[] types;
private LobManager lobManager;
+ private long totalSize;
+ private long rowsSampled;
private BatchManagerImpl(Long newID, Class<?>[] types) {
this.id = newID;
@@ -146,7 +162,6 @@
for (int i = 0; i < types.length; i++) {
this.types[i] = DataTypeManager.getDataTypeName(types[i]);
}
- cache.createCacheGroup(newID);
}
@Override
@@ -188,6 +203,10 @@
public Long createManagedBatch(List<? extends List<?>> batch,
Long previous, boolean removeOld)
throws TeiidComponentException {
+ if (cleanup == null) {
+ cache.createCacheGroup(id);
+ cleanup = AutoCleanupUtil.setCleanupReference(this, new Remover(id, prefersMemory));
+ }
int sizeEstimate = getSizeEstimate(batch);
Long oid = batchAdded.getAndIncrement();
CacheEntry old = null;
@@ -197,15 +216,17 @@
} else {
old = fastGet(previous, prefersMemory.get(), true);
}
+ } else {
+ totalSize += sizeEstimate;
+ rowsSampled += batch.size();
}
CacheKey key = new CacheKey(oid, (int)readAttempts.get(), old!=null?old.getKey().getOrderingValue():0);
CacheEntry ce = new CacheEntry(key, sizeEstimate, batch, this.ref, false);
+ cache.addToCacheGroup(id, ce.getId());
+ overheadBytes.addAndGet(BATCH_OVERHEAD);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
}
- maxReserveBytes.addAndGet(-BATCH_OVERHEAD);
- reserveBatchBytes.addAndGet(-BATCH_OVERHEAD);
- cache.addToCacheGroup(id, ce.getId());
addMemoryEntry(ce, true);
return oid;
}
@@ -215,9 +236,9 @@
throws IOException, ClassNotFoundException {
List<? extends List<?>> batch = BatchSerializer.readBatch(ois, types);
if (lobManager != null) {
- for (List<?> list : batch) {
+ for (int i = batch.size() - 1; i >= 0; i--) {
try {
- lobManager.updateReferences(list, ReferenceMode.ATTACH);
+ lobManager.updateReferences(batch.get(i), ReferenceMode.ATTACH);
} catch (TeiidComponentException e) {
throw new TeiidRuntimeException(e);
}
@@ -280,6 +301,7 @@
}
if (!retain) {
removeFromCache(this.id, batch);
+ persistBatchReferences(ce.getSizeEstimate());
} else {
addMemoryEntry(ce, false);
}
@@ -296,13 +318,25 @@
@Override
public void remove() {
- removeCacheGroup(id, prefersMemory.get());
+ if (cleanup != null) {
+ removeCacheGroup(id, prefersMemory.get());
+ AutoCleanupUtil.removeCleanupReference(cleanup);
+ cleanup = null;
+ }
}
@Override
public String toString() {
return id.toString();
}
+
+ @Override
+ public int getRowSizeEstimate() {
+ if (rowsSampled == 0) {
+ return 0;
+ }
+ return (int)(totalSize/rowsSampled);
+ }
}
private static class BatchSoftReference extends SoftReference<CacheEntry> {
@@ -322,21 +356,23 @@
private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
private static ReferenceQueue<CacheEntry> SOFT_QUEUE = new ReferenceQueue<CacheEntry>();
- // Configuration
- private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
+ // Configuration
+ private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
//set to acceptable defaults for testing
private int maxProcessingBytes = 1 << 21;
private Integer maxProcessingBytesOrig;
- AtomicLong maxReserveBytes = new AtomicLong(1 << 28);
+ long maxReserveBytes = 1 << 28;;
AtomicLong reserveBatchBytes = new AtomicLong();
+ AtomicLong overheadBytes = new AtomicLong();
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
private int maxSoftReferences;
+ private int nominalProcessingMemoryMax = maxProcessingBytes;
- private ReentrantLock lock = new ReentrantLock(true);
+ private ReentrantLock lock = new ReentrantLock();
private Condition batchesFreed = lock.newCondition();
AtomicLong activeBatchBytes = new AtomicLong();
@@ -344,14 +380,9 @@
private AtomicLong readAttempts = new AtomicLong();
//TODO: consider the size estimate in the weighting function
LrfuEvictionQueue<CacheEntry> evictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
+ LrfuEvictionQueue<CacheEntry> initialEvictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
ConcurrentHashMap<Long, CacheEntry> memoryEntries = new ConcurrentHashMap<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL);
- private ThreadLocal<Integer> reservedByThread = new ThreadLocal<Integer>() {
- protected Integer initialValue() {
- return 0;
- }
- };
-
//limited size reference caches based upon the memory settings
private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache;
private Map<Long, BatchSoftReference> softCache = Collections.synchronizedMap(new LinkedHashMap<Long, BatchSoftReference>(16, .75f, false) {
@@ -379,16 +410,19 @@
private AtomicLong writeCount = new AtomicLong();
private AtomicLong referenceHit = new AtomicLong();
+ //TODO: this does not scale well with multiple embedded instances
private static final Timer timer = new Timer("BufferManager Cleaner", true); //$NON-NLS-1$
+ private Cleaner cleaner;
+ private AtomicBoolean cleaning = new AtomicBoolean();
public BufferManagerImpl() {
- timer.schedule(new Cleaner(this), 15000, 15000);
+ this.cleaner = new Cleaner(this);
+ timer.schedule(cleaner, 100);
}
void clearSoftReference(BatchSoftReference bsr) {
synchronized (bsr) {
- maxReserveBytes.addAndGet(bsr.sizeEstimate);
- reserveBatchBytes.addAndGet(bsr.sizeEstimate);
+ overheadBytes.addAndGet(-bsr.sizeEstimate);
bsr.sizeEstimate = 0;
}
bsr.clear();
@@ -396,8 +430,7 @@
void removeFromCache(Long gid, Long batch) {
if (cache.remove(gid, batch)) {
- maxReserveBytes.addAndGet(BATCH_OVERHEAD);
- reserveBatchBytes.addAndGet(BATCH_OVERHEAD);
+ overheadBytes.addAndGet(-BATCH_OVERHEAD);
}
}
@@ -464,18 +497,14 @@
Class<?>[] types = getTypeClasses(elements);
BatchManagerImpl batchManager = createBatchManager(newID, types);
LobManager lobManager = null;
- FileStore lobStore = null;
if (lobIndexes != null) {
- lobStore = createFileStore(newID + "_lobs"); //$NON-NLS-1$
+ FileStore lobStore = createFileStore(newID + "_lobs"); //$NON-NLS-1$
lobManager = new LobManager(lobIndexes, lobStore);
batchManager.setLobManager(lobManager);
}
TupleBuffer tupleBuffer = new TupleBuffer(batchManager, String.valueOf(newID), elements, lobManager, getProcessorBatchSize(elements));
- if (lobStore != null) {
- AutoCleanupUtil.setCleanupReference(batchManager, lobStore);
- }
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(types), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(types), "batch size", tupleBuffer.getBatchSize(), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
tupleBuffer.setInlineLobs(inlineLobs);
return tupleBuffer;
@@ -502,7 +531,7 @@
return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(elements.subList(0, keyLength)), getProcessorBatchSize(elements), keyLength, lobManager);
}
- private static Class<?>[] getTypeClasses(final List elements) {
+ private static Class<?>[] getTypeClasses(final List<? extends Expression> elements) {
Class<?>[] types = new Class[elements.size()];
for (ListIterator<? extends Expression> i = elements.listIterator(); i.hasNext();) {
Expression expr = i.next();
@@ -512,16 +541,7 @@
}
private BatchManagerImpl createBatchManager(final Long newID, Class<?>[] types) {
- BatchManagerImpl bm = new BatchManagerImpl(newID, types);
- final AtomicBoolean prefersMemory = bm.prefersMemory;
- AutoCleanupUtil.setCleanupReference(bm, new Removable() {
-
- @Override
- public void remove() {
- BufferManagerImpl.this.removeCacheGroup(newID, prefersMemory.get());
- }
- });
- return bm;
+ return new BatchManagerImpl(newID, types);
}
@Override
@@ -551,33 +571,33 @@
public void setMaxReserveKB(int maxReserveBatchKB) {
if (maxReserveBatchKB > -1) {
int maxReserve = maxReserveBatchKB<<10;
- this.maxReserveBytes.set(maxReserve);
+ this.maxReserveBytes = maxReserve;
this.reserveBatchBytes.set(maxReserve);
} else {
- this.maxReserveBytes.set(-1);
+ this.maxReserveBytes = -1;
}
}
@Override
public void initialize() throws TeiidComponentException {
long maxMemory = Runtime.getRuntime().maxMemory();
- maxMemory = Math.max(0, maxMemory - (300 << 20)); //assume 300 megs of overhead for the AS/system stuff
+ maxMemory = Math.max(0, maxMemory - (SYSTEM_OVERHEAD_MEGS << 20)); //assume an overhead for the AS/system stuff
if (getMaxReserveKB() < 0) {
- this.maxReserveBytes.set(0);
+ this.maxReserveBytes = 0;
int one_gig = 1 << 30;
if (maxMemory > one_gig) {
- //assume 75% of the memory over the first gig
- this.maxReserveBytes.addAndGet((long)Math.max(0, (maxMemory - one_gig) * .75));
+ //assume 70% of the memory over the first gig
+ this.maxReserveBytes = (long)Math.max(0, (maxMemory - one_gig) * .7);
}
- this.maxReserveBytes.addAndGet(Math.max(0, Math.min(one_gig, maxMemory) >> 1));
+ this.maxReserveBytes += Math.max(0, Math.min(one_gig, maxMemory) >> 1);
}
- this.reserveBatchBytes.set(this.maxReserveBytes.get());
+ this.reserveBatchBytes.set(maxReserveBytes);
if (this.maxProcessingBytesOrig == null) {
//store the config value so that we can be reinitialized (this is not a clean approach)
this.maxProcessingBytesOrig = this.maxProcessingBytes;
}
if (this.maxProcessingBytesOrig < 0) {
- this.maxProcessingBytes = (int)Math.min(Math.max(processorBatchSize * targetBytesPerRow * 8l, (.1 * maxMemory)/maxActivePlans), Integer.MAX_VALUE);
+ this.maxProcessingBytes = (int)Math.min(Math.max(processorBatchSize * targetBytesPerRow * 16l, (.2 * maxMemory)/maxActivePlans), Integer.MAX_VALUE);
}
//make a guess at the max number of batches
long memoryBatches = maxMemory / (processorBatchSize * targetBytesPerRow);
@@ -587,16 +607,39 @@
weakReferenceCache = new WeakReferenceHashedValueCache<CacheEntry>(Math.min(30, logSize));
}
this.maxSoftReferences = 1 << Math.min(30, logSize);
+ this.nominalProcessingMemoryMax = (int)Math.max(Math.min(this.maxReserveBytes, 2*this.maxProcessingBytes), Math.min(Integer.MAX_VALUE, 2*this.maxReserveBytes/maxActivePlans));
}
+ void setNominalProcessingMemoryMax(int nominalProcessingMemoryMax) {
+ this.nominalProcessingMemoryMax = nominalProcessingMemoryMax;
+ }
+
@Override
+ public void releaseOrphanedBuffers(long count) {
+ releaseBuffers(count, false);
+ }
+
+ @Override
public void releaseBuffers(int count) {
+ releaseBuffers(count, true);
+ }
+
+ private void releaseBuffers(long count, boolean updateContext) {
if (count < 1) {
return;
}
- reservedByThread.set(reservedByThread.get() - count);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
+ if (updateContext) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing buffer space", count); //$NON-NLS-1$
+ }
+ CommandContext context = CommandContext.getThreadLocalContext();
+ if (context != null) {
+ context.addAndGetReservedBuffers((int)-count);
+ }
+ } else {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.INFO)) {
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Releasing orphaned buffer space", count); //$NON-NLS-1$
+ }
}
lock.lock();
try {
@@ -607,111 +650,218 @@
}
}
- /**
- * TODO: should consider other reservations by the current thread
- */
@Override
- public int reserveAdditionalBuffers(int additional) {
+ public int reserveBuffers(int count, BufferReserveMode mode) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", additional, "WAIT"); //$NON-NLS-1$ //$NON-NLS-2$
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
}
- lock.lock();
- try {
- //don't wait for more than is available
- int waitCount = Math.min(additional, this.getMaxReserveKB() - reservedByThread.get());
- int committed = 0;
- while (waitCount > 0 && waitCount > this.reserveBatchBytes.get() && committed < additional) {
- long reserveBatchSample = this.reserveBatchBytes.get();
- try {
- batchesFreed.await(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- throw new TeiidRuntimeException(e);
- }
- if (reserveBatchSample >= this.reserveBatchBytes.get()) {
- waitCount >>= 3;
- } else {
- waitCount >>= 1;
- }
- int result = noWaitReserve(additional - committed, false);
- committed += result;
- }
- return committed;
- } finally {
- lock.unlock();
- persistBatchReferences();
+ CommandContext context = CommandContext.getThreadLocalContext();
+ int existing = 0;
+ if (context != null) {
+ existing = (int)Math.min(Integer.MAX_VALUE, context.addAndGetReservedBuffers(0));
+ }
+ int result = count;
+ if (mode == BufferReserveMode.FORCE) {
+ reserve(count, context);
+ } else {
+ lock.lock();
+ try {
+ count = Math.min(count, nominalProcessingMemoryMax - existing);
+ result = noWaitReserve(count, false, context);
+ } finally {
+ lock.unlock();
+ }
}
+ persistBatchReferences(result);
+ return result;
}
+
+ private void reserve(int count, CommandContext context) {
+ this.reserveBatchBytes.addAndGet(-count);
+ if (context != null) {
+ context.addAndGetReservedBuffers(count);
+ }
+ }
@Override
- public int reserveBuffers(int count, BufferReserveMode mode) {
+ public int reserveBuffersBlocking(int count, long[] val, boolean force) throws BlockedException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, mode); //$NON-NLS-1$
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Reserving buffer space", count, force); //$NON-NLS-1$
}
- int result = count;
- if (mode == BufferReserveMode.FORCE) {
- this.reserveBatchBytes.addAndGet(-count);
- } else {
- result = noWaitReserve(count, true);
+ assert count >= 0;
+ if (count == 0) {
+ return 0;
}
- reservedByThread.set(reservedByThread.get() + result);
- persistBatchReferences();
+ int result = 0;
+ int count_orig = count;
+ CommandContext context = CommandContext.getThreadLocalContext();
+ long reserved = 0;
+ if (context != null) {
+ reserved = context.addAndGetReservedBuffers(0);
+ //TODO: in theory we have to check the whole stack as we could be
+ //issuing embedded queries back to ourselves
+ }
+ count = Math.min(count, (int)Math.min(Integer.MAX_VALUE, nominalProcessingMemoryMax - reserved));
+ if (count_orig != count && !force) {
+ return 0; //is not possible to reserve the desired amount
+ }
+ result = noWaitReserve(count, true, context);
+ if (result == 0) {
+ if (val[0]++ == 0) {
+ val[1] = System.currentTimeMillis();
+ }
+ if (val[1] > 1) {
+ long last = val[1];
+ val[1] = System.currentTimeMillis();
+ try {
+ lock.lock();
+ if (val[1] - last < 10) {
+ //if the time difference is too close, then wait to prevent tight spins
+ //but we can't wait too long as we don't want to thread starve the system
+ batchesFreed.await(20, TimeUnit.MILLISECONDS);
+ }
+ if ((val[0] << (force?16:18)) > count) {
+ //aging out
+ //TOOD: ideally we should be using a priority queue and better scheduling
+ if (!force) {
+ return 0;
+ }
+ reserve(count_orig, context);
+ result = count_orig;
+ } else {
+ int min = 0;
+ if (force) {
+ min = 2*count/3;
+ } else {
+ min = 4*count/5;
+ }
+ //if a sample looks good proceed
+ if (reserveBatchBytes.get() > min){
+ reserve(count_orig, context);
+ result = count_orig;
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ if (result == 0) {
+ throw BlockedException.BLOCKED_ON_MEMORY_EXCEPTION;
+ }
+ }
+ if (force && result < count_orig) {
+ reserve(count_orig - result, context);
+ result = count_orig;
+ }
+ val[0] = 0;
+ persistBatchReferences(result);
return result;
}
- private int noWaitReserve(int count, boolean allOrNothing) {
- for (int i = 0; i < 2; i++) {
+ private int noWaitReserve(int count, boolean allOrNothing, CommandContext context) {
+ boolean success = false;
+ for (int i = 0; !success && i < 2; i++) {
long reserveBatch = this.reserveBatchBytes.get();
- if (allOrNothing && count > reserveBatch) {
- return 0;
+ long overhead = this.overheadBytes.get();
+ long current = reserveBatch - overhead;
+ if (allOrNothing) {
+ if (count > current) {
+ return 0;
+ }
+ } else if (count > current) {
+ count = (int)Math.max(0, current);
}
- count = (int)Math.min(count, Math.max(0, reserveBatch));
if (count == 0) {
return 0;
}
if (this.reserveBatchBytes.compareAndSet(reserveBatch, reserveBatch - count)) {
- return count;
+ success = true;
}
}
//the value is changing rapidly, but we've already potentially adjusted the value twice, so just proceed
- this.reserveBatchBytes.addAndGet(-count);
+ if (!success) {
+ this.reserveBatchBytes.addAndGet(-count);
+ }
+ if (context != null) {
+ context.addAndGetReservedBuffers(count);
+ }
return count;
}
- void persistBatchReferences() {
- long activeBatch = activeBatchBytes.get();
+ void persistBatchReferences(int max) {
+ if (max <= 0) {
+ return;
+ }
+ if (!cleaning.get()) {
+ synchronized (cleaner) {
+ cleaner.notify();
+ }
+ }
+ long activeBatch = activeBatchBytes.get() + overheadBytes.get();
long reserveBatch = reserveBatchBytes.get();
- if (activeBatch <= reserveBatch) {
- long memoryCount = activeBatch + getMaxReserveKB() - reserveBatch;
- if (DataTypeManager.isValueCacheEnabled()) {
- if (memoryCount < getMaxReserveKB() / 8) {
- DataTypeManager.setValueCacheEnabled(false);
- }
- } else if (memoryCount > getMaxReserveKB() / 2) {
- DataTypeManager.setValueCacheEnabled(true);
+ long memoryCount = activeBatch + maxReserveBytes - reserveBatch;
+ if (memoryCount <= maxReserveBytes) {
+ if (DataTypeManager.USE_VALUE_CACHE && DataTypeManager.isValueCacheEnabled() && memoryCount < maxReserveBytes / 8) {
+ DataTypeManager.setValueCacheEnabled(false);
}
return;
+ } else if (DataTypeManager.USE_VALUE_CACHE) {
+ DataTypeManager.setValueCacheEnabled(true);
}
- long maxToFree = Math.min(maxProcessingBytes, (activeBatch - reserveBatch)<<1);
- doEvictions(maxToFree, true);
+ //we delay work here as there should be excess vm space, we are using an overestimate, and we want the cleaner to do the work if possible
+ //TODO: track sizes held by each queue independently
+ long maxToFree = Math.min(max, memoryCount - maxReserveBytes);
+ LrfuEvictionQueue<CacheEntry> first = initialEvictionQueue;
+ LrfuEvictionQueue<CacheEntry> second = evictionQueue;
+ if (evictionQueue.getSize() > 2*initialEvictionQueue.getSize()) {
+ //attempt to evict from the non-initial queue first as these should essentially be cost "free" and hopefully the reference cache can mitigate
+ //the cost of rereading
+ first = evictionQueue;
+ second = initialEvictionQueue;
+ }
+ maxToFree -= doEvictions(maxToFree, true, first);
+ if (maxToFree > 0) {
+ maxToFree = Math.min(maxToFree, activeBatchBytes.get() + overheadBytes.get() - reserveBatchBytes.get());
+ if (maxToFree > 0) {
+ doEvictions(maxToFree, true, second);
+ }
+ }
}
-
- void doEvictions(long maxToFree, boolean checkActiveBatch) {
- int freed = 0;
- while (freed <= maxToFree && (!checkActiveBatch || (maxToFree == 0 && activeBatchBytes.get() > reserveBatchBytes.get() * .7) || (maxToFree > 0 && activeBatchBytes.get() > reserveBatchBytes.get() * .8))) {
- CacheEntry ce = evictionQueue.firstEntry(true);
+
+ long doEvictions(long maxToFree, boolean checkActiveBatch, LrfuEvictionQueue<CacheEntry> queue) {
+ if (queue == evictionQueue) {
+ maxToFree = Math.min(maxToFree, this.maxProcessingBytes);
+ }
+ long freed = 0;
+ while (freed <= maxToFree && (
+ !checkActiveBatch //age out
+ || queue == evictionQueue && activeBatchBytes.get() + overheadBytes.get() + this.maxReserveBytes/2 > reserveBatchBytes.get() //nominal cleaning criterion
+ || queue != evictionQueue && activeBatchBytes.get() > 0)) { //assume that basically all initial batches will need to be written out at some point
+ CacheEntry ce = queue.firstEntry(checkActiveBatch);
if (ce == null) {
break;
}
synchronized (ce) {
if (!memoryEntries.containsKey(ce.getId())) {
+ checkActiveBatch = true;
continue; //not currently a valid eviction
}
}
+ if (!checkActiveBatch) {
+ long lastAccess = ce.getKey().getLastAccess();
+ long currentTime = readAttempts.get();
+ if (currentTime - lastAccess < MAX_READ_AGE) {
+ checkActiveBatch = true;
+ continue;
+ }
+ }
boolean evicted = true;
try {
evicted = evict(ce);
} catch (Throwable e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts to read "+ ce.getId() +" later will result in an exception."); //$NON-NLS-1$ //$NON-NLS-2$
} finally {
synchronized (ce) {
if (evicted && memoryEntries.remove(ce.getId()) != null) {
@@ -719,14 +869,13 @@
freed += ce.getSizeEstimate();
}
activeBatchBytes.addAndGet(-ce.getSizeEstimate());
- evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
- if (!checkActiveBatch) {
- break;
- }
+ queue.remove(ce); //ensures that an intervening get will still be cleaned
+ checkActiveBatch = true;
}
}
}
}
+ return freed;
}
boolean evict(CacheEntry ce) throws Exception {
@@ -762,14 +911,13 @@
int sizeEstimate = ce.getSizeEstimate()/2;
BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, sizeEstimate);
softCache.put(ce.getId(), ref);
- maxReserveBytes.addAndGet(- sizeEstimate);
- reserveBatchBytes.addAndGet(- sizeEstimate);
+ overheadBytes.addAndGet(sizeEstimate);
}
/**
* Get a CacheEntry without hitting storage
*/
- CacheEntry fastGet(Long batch, boolean prefersMemory, boolean retain) {
+ CacheEntry fastGet(Long batch, Boolean prefersMemory, boolean retain) {
CacheEntry ce = null;
if (retain) {
ce = memoryEntries.get(batch);
@@ -782,10 +930,17 @@
//there is a minute chance the batch was evicted
//this call ensures that we won't leak
if (memoryEntries.containsKey(batch)) {
- evictionQueue.touch(ce);
+ if (ce.isPersistent()) {
+ evictionQueue.touch(ce);
+ } else {
+ initialEvictionQueue.touch(ce);
+ }
}
} else {
evictionQueue.remove(ce);
+ if (!ce.isPersistent()) {
+ initialEvictionQueue.remove(ce);
+ }
}
}
if (!retain) {
@@ -793,7 +948,7 @@
}
return ce;
}
- if (prefersMemory) {
+ if (prefersMemory == null || prefersMemory) {
BatchSoftReference bsr = softCache.remove(batch);
if (bsr != null) {
ce = bsr.get();
@@ -801,7 +956,8 @@
clearSoftReference(bsr);
}
}
- } else if (useWeakReferences) {
+ }
+ if (ce == null && (prefersMemory == null || !prefersMemory) && useWeakReferences) {
ce = weakReferenceCache.getByHash(batch);
if (ce == null || !ce.getId().equals(batch)) {
return null;
@@ -846,11 +1002,11 @@
}
void addMemoryEntry(CacheEntry ce, boolean initial) {
- persistBatchReferences();
+ persistBatchReferences(ce.getSizeEstimate());
synchronized (ce) {
boolean added = memoryEntries.put(ce.getId(), ce) == null;
if (initial) {
- evictionQueue.add(ce);
+ initialEvictionQueue.add(ce);
} else if (added) {
evictionQueue.recordAccess(ce);
evictionQueue.add(ce);
@@ -861,15 +1017,16 @@
activeBatchBytes.getAndAdd(ce.getSizeEstimate());
}
- void removeCacheGroup(Long id, boolean prefersMemory) {
+ void removeCacheGroup(Long id, Boolean prefersMemory) {
cleanSoftReferences();
Collection<Long> vals = cache.removeCacheGroup(id);
long overhead = vals.size() * BATCH_OVERHEAD;
- maxReserveBytes.addAndGet(overhead);
- reserveBatchBytes.addAndGet(overhead);
- for (Long val : vals) {
- //TODO: we will unnecessarily call remove on the cache, but that should be low cost
- fastGet(val, prefersMemory, false);
+ overheadBytes.addAndGet(-overhead);
+ if (!vals.isEmpty()) {
+ for (Long val : vals) {
+ //TODO: we will unnecessarily call remove on the cache, but that should be low cost
+ fastGet(val, prefersMemory, false);
+ }
}
}
@@ -892,8 +1049,8 @@
private int[] getSizeEstimates(List<? extends Expression> elements) {
int total = 0;
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
- for (Expression element : elements) {
- Class<?> type = element.getType();
+ for (int i = elements.size() - 1; i >= 0; i--) {
+ Class<?> type = elements.get(i).getType();
total += SizeUtility.getSize(isValueCacheEnabled, type);
}
//assume 64-bit
@@ -932,6 +1089,11 @@
}
public void shutdown() {
+ this.cache = null;
+ this.memoryEntries.clear();
+ this.evictionQueue.getEvictionQueue().clear();
+ this.initialEvictionQueue.getEvictionQueue().clear();
+ this.cleaner.cancel();
}
@Override
@@ -970,14 +1132,14 @@
public void setUseWeakReferences(boolean useWeakReferences) {
this.useWeakReferences = useWeakReferences;
- }
+ }
public void setInlineLobs(boolean inlineLobs) {
this.inlineLobs = inlineLobs;
}
public int getMaxReserveKB() {
- return (int)maxReserveBytes.get()>>10;
+ return (int)maxReserveBytes>>10;
}
public void setCache(Cache cache) {
@@ -988,10 +1150,22 @@
return memoryEntries.size();
}
+ public long getActiveBatchBytes() {
+ return activeBatchBytes.get();
+ }
+
+ public long getReferenceHits() {
+ return referenceHit.get();
+ }
+
@Override
public Streamable<?> persistLob(Streamable<?> lob, FileStore store,
byte[] bytes) throws TeiidComponentException {
return LobManager.persistLob(lob, store, bytes, inlineLobs, DataTypeManager.MAX_LOB_MEMORY_BYTES);
}
+
+ public void invalidCacheGroup(Long gid) {
+ removeCacheGroup(gid, null);
+ }
}
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -52,7 +52,7 @@
}
public void write(byte b[], int off, int len) throws IOException {
- while (true) {
+ while (len > 0) {
ensureBuffer();
int toCopy = Math.min(buf.remaining(), len);
buf.put(b, off, toCopy);
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.teiid.common.buffer.BaseCacheEntry;
@@ -48,6 +49,7 @@
protected AtomicLong clock;
protected long maxInterval;
protected long halfLife;
+ private AtomicInteger size = new AtomicInteger();
public LrfuEvictionQueue(AtomicLong clock) {
this.clock = clock;
@@ -55,11 +57,19 @@
}
public boolean remove(V value) {
- return evictionQueue.remove(value.getKey()) != null;
+ if (evictionQueue.remove(value.getKey()) != null) {
+ size.addAndGet(-1);
+ return true;
+ }
+ return false;
}
public boolean add(V value) {
- return evictionQueue.put(value.getKey(), value) == null;
+ if (evictionQueue.put(value.getKey(), value) == null) {
+ size.addAndGet(1);
+ return true;
+ }
+ return false;
}
public void touch(V value) {
@@ -80,6 +90,9 @@
Map.Entry<CacheKey, V> entry = null;
if (poll) {
entry = evictionQueue.pollFirstEntry();
+ if (entry != null) {
+ size.addAndGet(-1);
+ }
} else {
entry = evictionQueue.firstEntry();
}
@@ -130,4 +143,8 @@
this.maxInterval = 62*this.halfLife;
}
+ public int getSize() {
+ return size.get();
+ }
+
}
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -22,6 +22,11 @@
package org.teiid.common.buffer.impl;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -30,8 +35,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.teiid.core.types.BaseLob;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.StorageMode;
+import org.teiid.core.types.Streamable;
/**
@@ -42,6 +53,26 @@
* Actual object allocation efficiency can be quite poor.
*/
public final class SizeUtility {
+ private static final int UNKNOWN_SIZE_BYTES = 512;
+
+ private static final class DummyOutputStream extends OutputStream {
+ int bytes;
+
+ @Override
+ public void write(int arg0) throws IOException {
+ bytes++;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ bytes+=len;
+ }
+
+ public int getBytes() {
+ return bytes;
+ }
+ }
+
public static final int REFERENCE_SIZE = 8;
private static Map<Class<?>, int[]> SIZE_ESTIMATES = new HashMap<Class<?>, int[]>(128);
@@ -73,6 +104,13 @@
private long bigDecimalEstimate;
private Class<?>[] types;
+ private static class ClassStats {
+ AtomicInteger samples = new AtomicInteger();
+ volatile int averageSize = UNKNOWN_SIZE_BYTES;
+ }
+
+ private static ConcurrentHashMap<String, ClassStats> objectEstimates = new ConcurrentHashMap<String, ClassStats>();
+
public SizeUtility(Class<?>[] types) {
boolean isValueCacheEnabled = DataTypeManager.isValueCacheEnabled();
bigIntegerEstimate = getSize(isValueCacheEnabled, DataTypeManager.DefaultDataClasses.BIG_INTEGER);
@@ -109,7 +147,7 @@
Class<?> type) {
int[] vals = SIZE_ESTIMATES.get(type);
if (vals == null) {
- return 512; //this is is misleading for lobs
+ return UNKNOWN_SIZE_BYTES; //this is is misleading for lobs
//most references are not actually removed from memory
}
return vals[isValueCacheEnabled?0:1];
@@ -124,7 +162,7 @@
return 0;
}
- if(type == DataTypeManager.DefaultDataClasses.STRING) {
+ if(obj.getClass() == DataTypeManager.DefaultDataClasses.STRING) {
int length = ((String)obj).length();
if (length > 0) {
return alignMemory(40 + (2 * length));
@@ -137,9 +175,7 @@
int bitLength = ((BigDecimal)obj).unscaledValue().bitLength();
//TODO: this does not account for the possibility of a cached string
long result = 88 + alignMemory(4 + (bitLength >> 3));
- if (updateEstimate) {
- bigDecimalEstimate = (bigDecimalEstimate + result)/2;
- }
+ bigDecimalEstimate = (bigDecimalEstimate + result)/2;
return result;
} else if(type == DataTypeManager.DefaultDataClasses.BIG_INTEGER) {
if (!updateEstimate) {
@@ -147,15 +183,13 @@
}
int bitLength = ((BigInteger)obj).bitLength();
long result = 40 + alignMemory(4 + (bitLength >> 3));
- if (updateEstimate) {
- bigIntegerEstimate = (bigIntegerEstimate + result)/2;
- }
+ bigIntegerEstimate = (bigIntegerEstimate + result)/2;
return result;
} else if(obj instanceof Iterable<?>) {
Iterable<?> i = (Iterable<?>)obj;
long total = 16;
for (Object object : i) {
- total += getSize(object, DataTypeManager.determineDataTypeClass(object), true, false) + REFERENCE_SIZE;
+ total += getSize(object, DataTypeManager.determineDataTypeClass(object), updateEstimate, false) + REFERENCE_SIZE;
}
return total;
} else if(obj.getClass().isArray()) {
@@ -164,7 +198,7 @@
Object[] rows = (Object[]) obj;
long total = 16 + alignMemory(rows.length * REFERENCE_SIZE); // Array overhead
for(int i=0; i<rows.length; i++) {
- total += getSize(rows[i], DataTypeManager.determineDataTypeClass(rows[i]), true, false);
+ total += getSize(rows[i], DataTypeManager.determineDataTypeClass(rows[i]), updateEstimate, false);
}
return total;
}
@@ -180,7 +214,67 @@
primitiveSize = 4;
}
return alignMemory(length * primitiveSize) + 16;
- }
+ } else if (obj instanceof Streamable<?>) {
+ try {
+ Streamable<?> s = (Streamable)obj;
+ Object o = s.getReference();
+ if (o instanceof BaseLob) {
+ InputStreamFactory isf = ((BaseLob)o).getStreamFactory();
+ if (isf.getStorageMode() == StorageMode.MEMORY) {
+ long length = isf.getLength();
+ if (length >= 0) {
+ return 40 + alignMemory(length);
+ }
+ } else if (isf.getStorageMode() == StorageMode.PERSISTENT) {
+ long length = isf.getLength();
+ return 40 + alignMemory(Math.min(DataTypeManager.MAX_LOB_MEMORY_BYTES, length));
+ }
+ }
+ } catch (Exception e) {
+ }
+ } else if (type == DataTypeManager.DefaultDataClasses.OBJECT) {
+ if (obj.getClass() != DataTypeManager.DefaultDataClasses.OBJECT && (SIZE_ESTIMATES.containsKey(obj.getClass()) || VARIABLE_SIZE_TYPES.contains(obj.getClass()))) {
+ return getSize(obj, obj.getClass(), updateEstimate, accountForValueCache);
+ }
+ //assume we can get a plausable estimate from the serialized size
+ if (obj instanceof Serializable) {
+ ClassStats stats = objectEstimates.get(obj.getClass().getName()); //we're ignoring classloader differences here
+ if (stats == null) {
+ stats = new ClassStats();
+ objectEstimates.put(obj.getClass().getName(), stats);
+ }
+ if (updateEstimate) {
+ int samples = stats.samples.getAndIncrement();
+ if (samples < 1000 || (samples&1023) == 1023) {
+ try {
+ DummyOutputStream os = new DummyOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(os) {
+ @Override
+ protected void writeClassDescriptor(
+ ObjectStreamClass desc) throws IOException {
+ }
+ @Override
+ protected void writeStreamHeader()
+ throws IOException {
+ }
+ };
+ oos.writeObject(obj);
+ oos.close();
+ int result = (int)alignMemory(os.getBytes() * 3);
+ if (result > stats.averageSize) {
+ stats.averageSize = (stats.averageSize + result*2)/3;
+ } else {
+ stats.averageSize = (stats.averageSize + result)/2;
+ }
+ return result;
+ } catch (Exception e) {
+
+ }
+ }
+ }
+ return stats.averageSize;
+ }
+ }
return getSize(accountForValueCache, type);
}
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -34,17 +34,17 @@
import java.util.concurrent.Callable;
import org.teiid.client.RequestMessage;
+import org.teiid.client.RequestMessage.ShowPlan;
import org.teiid.client.ResultsMessage;
import org.teiid.client.SourceWarning;
-import org.teiid.client.RequestMessage.ShowPlan;
import org.teiid.client.lob.LobChunk;
import org.teiid.client.metadata.ParameterInfo;
import org.teiid.client.util.ResultsReceiver;
import org.teiid.client.xa.XATransactionException;
import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.TupleBatch;
import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
@@ -59,14 +59,14 @@
import org.teiid.dqp.message.AtomicRequestID;
import org.teiid.dqp.message.RequestID;
import org.teiid.dqp.service.TransactionContext;
+import org.teiid.dqp.service.TransactionContext.Scope;
import org.teiid.dqp.service.TransactionService;
-import org.teiid.dqp.service.TransactionContext.Scope;
+import org.teiid.jdbc.EnhancedTimer.Task;
import org.teiid.jdbc.SQLStates;
-import org.teiid.jdbc.EnhancedTimer.Task;
+import org.teiid.logging.CommandLogMessage.Event;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
-import org.teiid.logging.CommandLogMessage.Event;
import org.teiid.metadata.FunctionMethod.Determinism;
import org.teiid.query.QueryPlugin;
import org.teiid.query.analysis.AnalysisRecord;
@@ -78,6 +78,7 @@
import org.teiid.query.sql.lang.SPParameter;
import org.teiid.query.sql.lang.StoredProcedure;
import org.teiid.query.sql.symbol.SingleElementSymbol;
+import org.teiid.query.util.CommandContext;
public class RequestWorkItem extends AbstractWorkItem implements PrioritizedRunnable {
@@ -372,7 +373,12 @@
if (!doneProducingBatches) {
this.processor.getContext().setTimeSliceEnd(System.currentTimeMillis() + this.processorTimeslice);
sendResultsIfNeeded(null);
- this.resultsBuffer = collector.collectTuples();
+ try {
+ CommandContext.pushThreadLocalContext(this.processor.getContext());
+ this.resultsBuffer = collector.collectTuples();
+ } finally {
+ CommandContext.popThreadLocalContext();
+ }
if (!doneProducingBatches) {
doneProducingBatches();
//TODO: we could perform more tracking to know what source lobs are in use
@@ -435,6 +441,9 @@
for (DataTierTupleSource connectorRequest : getConnectorRequests()) {
connectorRequest.fullyCloseSource();
}
+
+ CommandContext cc = this.processor.getContext();
+ cc.close();
}
this.resultsBuffer = null;
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/QueryProcessor.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -107,6 +107,9 @@
if (!this.context.isNonBlocking()) {
throw e;
}
+ if (e == BlockedException.BLOCKED_ON_MEMORY_EXCEPTION) {
+ continue; //TODO: pass the commandcontext into sortutility
+ }
}
try {
Thread.sleep(wait);
@@ -241,6 +244,9 @@
if (!this.context.isNonBlocking()) {
throw e;
}
+ if (e == BlockedException.BLOCKED_ON_MEMORY_EXCEPTION) {
+ continue; //TODO: pass the commandcontext into sortutility
+ }
} catch (TeiidComponentException e) {
closeProcessing();
throw e;
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/DependentCriteriaProcessor.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -91,7 +91,8 @@
sortDirection.add(Boolean.valueOf(OrderBy.ASC));
sortSymbols.add((SingleElementSymbol)dependentSetStates.get(i).valueExpression);
}
- this.sortUtility = new SortUtility(originalVs.getTupleBuffer().createIndexedTupleSource(), sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
+ this.sortUtility = new SortUtility(null, sortSymbols, sortDirection, Mode.DUP_REMOVE, dependentNode.getBufferManager(), dependentNode.getConnectionID(), originalVs.getTupleBuffer().getSchema());
+ this.sortUtility.setWorkingBuffer(originalVs.getTupleBuffer());
}
dvs = new DependentValueSource(sortUtility.sort());
} else {
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -31,11 +31,11 @@
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
-import org.teiid.common.buffer.IndexedTupleSource;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.TupleBuffer;
+import org.teiid.common.buffer.TupleBuffer.TupleBufferTupleSource;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.BufferReserveMode;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.util.Assertion;
@@ -53,6 +53,7 @@
* Implements several modes of a multi-pass sort.
*
* TODO: could consider using an index for dup_removal and maintaining a separate output buffer
+ * TODO: release the tuple buffer in the last merge pass if sublists will fit in processing batch size
*/
public class SortUtility {
@@ -73,7 +74,7 @@
private class SortedSublist implements Comparable<SortedSublist> {
List<?> tuple;
int index;
- IndexedTupleSource its;
+ TupleBufferTupleSource its;
int limit = Integer.MAX_VALUE;
@Override
@@ -97,6 +98,7 @@
private int schemaSize;
private int batchSize;
private ListNestedSortComparator comparator;
+ private int targetRowCount;
private TupleBuffer output;
private boolean doneReading;
@@ -104,13 +106,15 @@
private List<TupleBuffer> activeTupleBuffers = new ArrayList<TupleBuffer>();
private int masterSortIndex;
- private int collected;
+ private int processed;
// Phase constants for readability
private static final int INITIAL_SORT = 1;
private static final int MERGE = 2;
private static final int DONE = 3;
- private Collection<List<?>> workingTuples;
+ private TupleBuffer workingBuffer;
+ private long[] attempts = new long[2];
+ private boolean nonBlocking;
public SortUtility(TupleSource sourceID, List<OrderByItem> items, Mode mode, BufferManager bufferMgr,
String groupName, List<? extends Expression> schema) {
@@ -167,6 +171,7 @@
this.schema = schema;
this.schemaSize = bufferManager.getSchemaSize(this.schema);
this.batchSize = bufferManager.getProcessorBatchSize(this.schema);
+ this.targetRowCount = Math.max(bufferManager.getMaxProcessingSize()/this.schemaSize, 2)*this.batchSize;
this.comparator = new ListNestedSortComparator(cols, sortTypes);
int distinctIndex = cols.length - 1;
this.comparator.setDistinctIndex(distinctIndex);
@@ -180,38 +185,57 @@
public TupleBuffer sort()
throws TeiidComponentException, TeiidProcessingException {
-
- if(this.phase == INITIAL_SORT) {
- initialSort();
+ boolean success = false;
+ try {
+ if(this.phase == INITIAL_SORT) {
+ initialSort(false);
+ }
+
+ if(this.phase == MERGE) {
+ mergePhase();
+ }
+ success = true;
+ if (this.output != null) {
+ return this.output;
+ }
+ return this.activeTupleBuffers.get(0);
+ } catch (BlockedException e) {
+ success = true;
+ throw e;
+ } finally {
+ if (!success) {
+ remove();
+ }
}
-
- if(this.phase == MERGE) {
- mergePhase();
- }
- if (this.output != null) {
- return this.output;
- }
- return this.activeTupleBuffers.get(0);
}
public List<TupleBuffer> onePassSort() throws TeiidComponentException, TeiidProcessingException {
+ boolean success = false;
assert this.mode != Mode.DUP_REMOVE;
-
- if(this.phase == INITIAL_SORT) {
- initialSort();
- }
-
- for (TupleBuffer tb : activeTupleBuffers) {
- tb.close();
- }
-
- return activeTupleBuffers;
+ try {
+ if(this.phase == INITIAL_SORT) {
+ initialSort(true);
+ }
+
+ for (TupleBuffer tb : activeTupleBuffers) {
+ tb.close();
+ }
+ success = true;
+ return activeTupleBuffers;
+ } catch (BlockedException e) {
+ success = true;
+ throw e;
+ } finally {
+ if (!success) {
+ remove();
+ }
+ }
}
-
+
private TupleBuffer createTupleBuffer() throws TeiidComponentException {
TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer ", tb.getId()); //$NON-NLS-1$
+ LogManager.logDetail(LogConstants.CTX_DQP, "Created intermediate sort buffer", tb); //$NON-NLS-1$
}
tb.setForwardOnly(true);
return tb;
@@ -220,30 +244,16 @@
/**
* creates sorted sublists stored in tuplebuffers
*/
- protected void initialSort() throws TeiidComponentException, TeiidProcessingException {
- while(!doneReading) {
- if (workingTuples == null) {
- if (mode == Mode.SORT) {
- workingTuples = new ArrayList<List<?>>();
- } else {
- workingTuples = new TreeSet<List<?>>(comparator);
- }
- }
+ protected void initialSort(boolean onePass) throws TeiidComponentException, TeiidProcessingException {
+ outer: while (!doneReading) {
- int totalReservedBuffers = 0;
- try {
- int maxRows = this.batchSize;
- while(!doneReading) {
- //attempt to reserve more working memory if there are additional rows available before blocking
- if (workingTuples.size() >= maxRows) {
- int reserved = bufferManager.reserveBuffers(schemaSize,
- (totalReservedBuffers + schemaSize <= bufferManager.getMaxProcessingSize())?BufferReserveMode.FORCE:BufferReserveMode.NO_WAIT);
- totalReservedBuffers += reserved;
- if (reserved != schemaSize) {
- break;
- }
- maxRows += this.batchSize;
- }
+ if (this.source != null) {
+ //sub-phase 1 - build up a working buffer of tuples
+ if (this.workingBuffer == null) {
+ this.workingBuffer = createTupleBuffer();
+ }
+
+ while (!doneReading) {
try {
List<?> tuple = source.nextTuple();
@@ -251,26 +261,96 @@
doneReading = true;
break;
}
- if (workingTuples.add(tuple)) {
- this.collected++;
- }
+ this.workingBuffer.addTuple(tuple);
} catch(BlockedException e) {
- if (workingTuples.size() >= this.batchSize) {
- break;
+ /*there are three cases here
+ * 1. a fully blocking sort (optionally dup removal)
+ * 2. a streaming dup removal
+ * 3. a one pass sort (for grace join like processing)
+ */
+ if (!onePass && mode != Mode.DUP_REMOVE) {
+ throw e; //read fully before processing
}
- if (mode != Mode.DUP_REMOVE
- || (this.output != null && collected < this.output.getRowCount() * 2)
- || (this.output == null && this.workingTuples.isEmpty() && this.activeTupleBuffers.isEmpty())) {
- throw e; //block if no work can be performed
+ if (!onePass) {
+ //streaming dup remove - we have to balance latency vs. performance
+ //each merge phase is a full scan over the intermediate results
+ if (this.output != null && this.workingBuffer.getRowCount() < 2*this.processed) {
+ throw e;
+ }
+ } else {
+ //we're trying to create intermediate buffers that will comfortably be small memory sorts
+ if (this.workingBuffer.getRowCount() < this.targetRowCount) {
+ throw e;
+ }
}
- break;
+ break outer; //there's processing that we can do
}
+ }
+ } else {
+ doneReading = true;
+ }
+ }
+
+ //sub-phase 2 - perform a memory sort on the workingbuffer/source
+ int totalReservedBuffers = 0;
+ try {
+ int maxRows = this.batchSize;
+ Collection<List<?>> workingTuples = null;
+ boolean done = false;
+ /*
+ * we can balance the work between the initial / multi-pass sort based upon the row count
+ * and an updated estimate of the batch memory size
+ */
+ this.workingBuffer.close();
+ schemaSize = Math.max(1, this.workingBuffer.getRowSizeEstimate()*this.batchSize);
+ long memorySpaceNeeded = workingBuffer.getRowCount()*this.workingBuffer.getRowSizeEstimate();
+ if (onePass) {
+ //one pass just needs small sub-lists
+ memorySpaceNeeded = Math.min(memorySpaceNeeded, bufferManager.getMaxProcessingSize());
+ }
+ totalReservedBuffers = bufferManager.reserveBuffers(Math.min(bufferManager.getMaxProcessingSize(), (int)Math.min(memorySpaceNeeded, Integer.MAX_VALUE)), BufferReserveMode.FORCE);
+ if (totalReservedBuffers != memorySpaceNeeded) {
+ int processingSublists = Math.max(2, bufferManager.getMaxProcessingSize()/schemaSize);
+ int desiredSpace = (int)Math.min(Integer.MAX_VALUE, (workingBuffer.getRowCount()/processingSublists + (workingBuffer.getRowCount()%processingSublists))*(long)this.workingBuffer.getRowSizeEstimate());
+ if (desiredSpace > totalReservedBuffers) {
+ totalReservedBuffers += bufferManager.reserveBuffers(desiredSpace - totalReservedBuffers, BufferReserveMode.NO_WAIT);
+ //TODO: wait to force 2/3 pass processing
+ } else if (memorySpaceNeeded <= Integer.MAX_VALUE) {
+ totalReservedBuffers += bufferManager.reserveBuffers((int)memorySpaceNeeded - totalReservedBuffers, BufferReserveMode.NO_WAIT);
+ }
+ if (totalReservedBuffers > schemaSize) {
+ int additional = totalReservedBuffers%schemaSize;
+ totalReservedBuffers-=additional;
+ //release any excess
+ bufferManager.releaseBuffers(additional);
+ }
+ }
+ TupleBufferTupleSource ts = workingBuffer.createIndexedTupleSource(source != null);
+ //ts.setReverse(workingBuffer.getRowCount() > this.batchSize);
+ processed+=this.workingBuffer.getRowCount();
+ maxRows = Math.max(1, (totalReservedBuffers/schemaSize))*batchSize;
+ if (mode == Mode.SORT) {
+ workingTuples = new ArrayList<List<?>>();
+ } else {
+ workingTuples = new TreeSet<List<?>>(comparator);
+ }
+ outer: while (!done) {
+ while(!done) {
+ if (workingTuples.size() >= maxRows) {
+ break;
+ }
+ List<?> tuple = ts.nextTuple();
+
+ if (tuple == null) {
+ done = true;
+ if(workingTuples.isEmpty()) {
+ break outer;
+ }
+ break;
+ }
+ workingTuples.add(tuple);
}
- if(workingTuples.isEmpty()) {
- break;
- }
-
TupleBuffer sublist = createTupleBuffer();
activeTupleBuffers.add(sublist);
if (this.mode == Mode.SORT) {
@@ -280,40 +360,79 @@
for (List<?> list : workingTuples) {
sublist.addTuple(list);
}
- workingTuples = null;
-
+ workingTuples.clear();
sublist.saveBatch();
- } finally {
- bufferManager.releaseBuffers(totalReservedBuffers);
}
+ } catch (BlockedException e) {
+ Assertion.failed("should not block during memory sublist sorting"); //$NON-NLS-1$
+ } finally {
+ bufferManager.releaseBuffers(totalReservedBuffers);
+ if (this.workingBuffer != null) {
+ if (this.source != null) {
+ this.workingBuffer.remove();
+ }
+ this.workingBuffer = null;
+ }
}
if (this.activeTupleBuffers.isEmpty()) {
activeTupleBuffers.add(createTupleBuffer());
}
- this.collected = 0;
this.phase = MERGE;
}
+ public void setWorkingBuffer(TupleBuffer workingBuffer) {
+ this.workingBuffer = workingBuffer;
+ }
+
protected void mergePhase() throws TeiidComponentException, TeiidProcessingException {
- while(this.activeTupleBuffers.size() > 1) {
- ArrayList<SortedSublist> sublists = new ArrayList<SortedSublist>(activeTupleBuffers.size());
-
- TupleBuffer merged = createTupleBuffer();
+ long desiredSpace = activeTupleBuffers.size() * (long)schemaSize;
+ int toForce = (int)Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
+ int reserved = 0;
+
+ if (desiredSpace > toForce) {
+ try {
+ int subLists = Math.max(2, this.bufferManager.getMaxProcessingSize()/schemaSize);
+ int twoPass = subLists * subLists;
+ if (twoPass < activeTupleBuffers.size()) {
+ //wait for 2-pass
+ int needed = (int)Math.ceil(Math.pow(activeTupleBuffers.size(), .5));
+ reserved += bufferManager.reserveBuffersBlocking(needed * schemaSize - toForce, attempts, false);
+ if (reserved == 0 && twoPass*subLists < activeTupleBuffers.size()) {
+ //force 3-pass
+ needed = (int)Math.ceil(Math.pow(activeTupleBuffers.size(), 1/3d));
+ reserved += bufferManager.reserveBuffersBlocking(needed * schemaSize - toForce, attempts, true);
+ }
+ } else if (desiredSpace < Integer.MAX_VALUE) {
+ //wait for 1-pass
+ reserved += bufferManager.reserveBuffersBlocking((int)desiredSpace - toForce, attempts, false);
+ }
+ } catch (BlockedException be) {
+ if (!nonBlocking) {
+ throw be;
+ }
+ }
+ }
+ int total = reserved + toForce;
+ if (total > schemaSize) {
+ toForce -= total % schemaSize;
+ }
+ reserved += bufferManager.reserveBuffers(toForce, BufferReserveMode.FORCE);
+
+ try {
+ while(this.activeTupleBuffers.size() > 1) {
+ ArrayList<SortedSublist> sublists = new ArrayList<SortedSublist>(activeTupleBuffers.size());
+
+ TupleBuffer merged = createTupleBuffer();
- int desiredSpace = activeTupleBuffers.size() * schemaSize;
- int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingSize()));
- bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
- if (desiredSpace > reserved) {
- reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);
- }
- int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow progress
- //release any partial excess
- int release = reserved % schemaSize > 0 ? 1 : 0;
- bufferManager.releaseBuffers(release);
- reserved -= release;
- try {
- if (LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ desiredSpace = activeTupleBuffers.size() * (long)schemaSize;
+ if (desiredSpace < reserved) {
+ bufferManager.releaseBuffers(reserved - (int)desiredSpace);
+ reserved = (int)desiredSpace;
+ }
+ int maxSortIndex = Math.max(2, reserved / schemaSize); //always allow progress
+
+ if (LogManager.isMessageToBeRecorded(org.teiid.logging.LogConstants.CTX_DQP, MessageLevel.TRACE)) {
LogManager.logTrace(org.teiid.logging.LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
}
// initialize the sublists with the min value
@@ -321,6 +440,7 @@
TupleBuffer activeID = activeTupleBuffers.get(i);
SortedSublist sortedSublist = new SortedSublist();
sortedSublist.its = activeID.createIndexedTupleSource();
+ sortedSublist.its.setNoBlocking(true);
sortedSublist.index = i;
if (activeID == output) {
sortedSublist.limit = output.getRowCount();
@@ -351,9 +471,9 @@
if (masterSortIndex < 0) {
masterSortIndex = this.activeTupleBuffers.size() - 1;
}
- } finally {
- this.bufferManager.releaseBuffers(reserved);
- }
+ }
+ } finally {
+ this.bufferManager.releaseBuffers(reserved);
}
// Close sorted source (all others have been removed)
@@ -385,11 +505,7 @@
if (sortedSublist.limit < sortedSublist.its.getCurrentIndex()) {
return; //special case for still reading the output tuplebuffer
}
- try {
- sortedSublist.tuple = sortedSublist.its.nextTuple();
- } catch (BlockedException e) {
- //intermediate sources aren't closed
- }
+ sortedSublist.tuple = sortedSublist.its.nextTuple();
if (sortedSublist.tuple == null) {
return; // done with this sublist
}
@@ -415,5 +531,29 @@
public boolean isDistinct() {
return this.comparator.isDistinct();
}
+
+ public void remove() {
+ if (workingBuffer != null && source != null) {
+ workingBuffer.remove();
+ workingBuffer = null;
+ }
+ if (!this.activeTupleBuffers.isEmpty()) {
+ //these can be leaked with a single pass, but
+ //they should not be reused whole
+ for (int i = 0; i < this.activeTupleBuffers.size(); i++) {
+ TupleBuffer tb = this.activeTupleBuffers.get(i);
+ if (tb == output || (i == 0 && phase == DONE)) {
+ continue;
+ }
+ tb.remove();
+ }
+ this.activeTupleBuffers.clear();
+ }
+ this.output = null;
+ }
+
+ public void setNonBlocking(boolean b) {
+ this.nonBlocking = b;
+ }
}
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SortingFilter.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -28,9 +28,9 @@
import org.teiid.api.exception.query.ExpressionEvaluationException;
import org.teiid.api.exception.query.FunctionExecutionException;
import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.common.buffer.TupleBuffer;
import org.teiid.common.buffer.TupleSource;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.query.function.aggregate.AggregateFunction;
@@ -134,7 +134,9 @@
// Sort
if (sortUtility == null) {
- sortUtility = new SortUtility(collectionBuffer.createIndexedTupleSource(), sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName, collectionBuffer.getSchema());
+ sortUtility = new SortUtility(null, sortItems, removeDuplicates?Mode.DUP_REMOVE_SORT:Mode.SORT, mgr, groupName, collectionBuffer.getSchema());
+ collectionBuffer.setForwardOnly(true);
+ sortUtility.setWorkingBuffer(collectionBuffer);
}
TupleBuffer sorted = sortUtility.sort();
sorted.setForwardOnly(true);
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/processor/relational/SourceState.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -286,8 +286,6 @@
this.prefetch.setPosition(1);
this.prefetch.disableSave();
ts = this.prefetch;
- } else {
- ts = this.buffer.createIndexedTupleSource();
}
} else {
ts = new BatchIterator(this.source);
@@ -295,6 +293,9 @@
this.sortUtility = new SortUtility(ts, expressions, Collections.nCopies(expressions.size(), OrderBy.ASC),
sortOption == SortOption.SORT_DISTINCT?Mode.DUP_REMOVE_SORT:Mode.SORT, this.source.getBufferManager(), this.source.getConnectionID(), source.getElements());
this.markDistinct(sortOption == SortOption.SORT_DISTINCT && expressions.size() == this.getOuterVals().size());
+ if (ts == null) {
+ this.sortUtility.setWorkingBuffer(this.buffer);
+ }
}
if (sortOption == SortOption.NOT_SORTED) {
this.buffers = sortUtility.onePassSort();
Modified: branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/query/util/CommandContext.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -76,6 +76,13 @@
*/
public class CommandContext implements Cloneable, org.teiid.CommandContext {
+ private static ThreadLocal<LinkedList<CommandContext>> threadLocalContext = new ThreadLocal<LinkedList<CommandContext>>() {
+ @Override
+ protected LinkedList<CommandContext> initialValue() {
+ return new LinkedList<CommandContext>();
+ }
+ };
+
private static class GlobalState {
/** Uniquely identify the command being processed */
private Object processorID;
@@ -145,6 +152,7 @@
private LRUCache<String, SimpleDateFormat> dateFormatCache;
private Options options;
+ private long reservedBuffers;
}
private GlobalState globalState = new GlobalState();
@@ -656,6 +664,16 @@
this.globalState.executor = e;
}
+ public void close() {
+ synchronized (this.globalState) {
+ if (this.globalState.reservedBuffers > 0) {
+ long toRelease = this.globalState.reservedBuffers;
+ this.globalState.reservedBuffers = 0;
+ this.globalState.bufferManager.releaseOrphanedBuffers(toRelease);
+ }
+ }
+ }
+
public static DecimalFormat getDecimalFormat(CommandContext context, String format) {
DecimalFormat result = null;
if (context != null) {
@@ -700,6 +718,22 @@
return this.globalState.options;
}
+ public static CommandContext getThreadLocalContext() {
+ return threadLocalContext.get().peek();
+ }
+
+ public static void pushThreadLocalContext(CommandContext context) {
+ threadLocalContext.get().push(context);
+ }
+
+ public static void popThreadLocalContext() {
+ threadLocalContext.get().poll();
+ }
+
+ public long addAndGetReservedBuffers(int i) {
+ return globalState.reservedBuffers += i;
+ }
+
public void setOptions(Options options) {
this.globalState.options = options;
}
Copied: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java (from rev 4560, branches/7.7.x-TEIID-2429/engine/src/main/java/org/teiid/common/buffer/BatchManager.java)
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java (rev 0)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestExtensibleBufferedInputStream.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+public class TestExtensibleBufferedInputStream {
+
+ @Test public void testReset() throws IOException {
+ InputStream is = new ExtensibleBufferedInputStream() {
+ boolean returned = false;
+ @Override
+ protected ByteBuffer nextBuffer() throws IOException {
+ if (returned) {
+ return null;
+ }
+ ByteBuffer result = ByteBuffer.allocate(3);
+ returned = true;
+ return result;
+ }
+ };
+ is.read();
+ is.read();
+ is.reset();
+ for (int i = 0; i < 3; i++) {
+ assertEquals(0, is.read());
+ }
+ assertEquals(-1, is.read());
+ }
+
+}
Modified: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestSTree.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -134,8 +134,8 @@
}
@Test public void testSearch() throws TeiidComponentException, TeiidProcessingException {
- BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
- bm.setProcessorBatchSize(1);
+ //due to buffering changes we need to hold this in memory directly rather than serialize it out as that will lead to GC overhead errors
+ BufferManagerImpl bm = BufferManagerFactory.getTestBufferManager(Integer.MAX_VALUE, 1);
ElementSymbol e1 = new ElementSymbol("x");
e1.setType(Integer.class);
Modified: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -31,6 +31,7 @@
import org.junit.Test;
import org.teiid.common.buffer.BufferManager.TupleSourceType;
+import org.teiid.common.buffer.TupleBuffer.TupleBufferTupleSource;
import org.teiid.core.types.ClobType;
import org.teiid.core.types.DataTypeManager;
import org.teiid.query.sql.symbol.ElementSymbol;
@@ -72,4 +73,19 @@
assertNotNull(tb.getLobReference(c.getReferenceStreamId()));
}
+ @Test public void testReverseIteration() throws Exception {
+ ElementSymbol x = new ElementSymbol("x"); //$NON-NLS-1$
+ x.setType(DataTypeManager.DefaultDataClasses.INTEGER);
+ List<ElementSymbol> schema = Arrays.asList(x);
+ TupleBuffer tb = BufferManagerFactory.getStandaloneBufferManager().createTupleBuffer(schema, "x", TupleSourceType.PROCESSOR); //$NON-NLS-1$
+ tb.addTuple(Arrays.asList(1));
+ tb.addTuple(Arrays.asList(2));
+ TupleBufferTupleSource tbts = tb.createIndexedTupleSource();
+ tbts.setReverse(true);
+ assertTrue(tbts.hasNext());
+ assertEquals(2, tbts.nextTuple().get(0));
+ assertEquals(1, tbts.nextTuple().get(0));
+ assertFalse(tbts.hasNext());
+ }
+
}
Added: branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
--- branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java (rev 0)
+++ branches/7.7.x-TEIID-2429/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferManagerImpl.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.teiid.common.buffer.BufferManager.BufferReserveMode;
+
+public class TestBufferManagerImpl {
+
+ @Test public void testReserve() throws Exception {
+ BufferManagerImpl bufferManager = new BufferManagerImpl();
+ bufferManager.setCache(new MemoryStorageManager());
+ bufferManager.setMaxProcessingKB(1024);
+ bufferManager.setMaxReserveKB(1024);
+ bufferManager.initialize();
+ bufferManager.setNominalProcessingMemoryMax(512000);
+
+ //restricted by nominal max
+ assertEquals(512000, bufferManager.reserveBuffers(1024000, BufferReserveMode.NO_WAIT));
+ //forced
+ assertEquals(1024000, bufferManager.reserveBuffersBlocking(1024000, new long[] {0,0}, true));
+
+ //not forced, so we get noting
+ assertEquals(0, bufferManager.reserveBuffersBlocking(1024000, new long[] {0,0}, false));
+
+ bufferManager.releaseBuffers(512000);
+ //the difference between 1mb and 1000k
+ assertEquals(24576, bufferManager.reserveBuffers(1024000, BufferReserveMode.NO_WAIT));
+ }
+
+}
Modified: branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java
===================================================================
--- branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/test-integration/common/src/test/java/org/teiid/systemmodel/TestSystemVirtualModel.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -116,10 +116,11 @@
execute("select * from vdbresources",new Object[] {}); //$NON-NLS-1$
TestMMDatabaseMetaData.compareResultSet(this.internalResultSet);
}
-
+/**
@Test public void testColumns() throws Exception {
- checkResult("testColumns", "select* from SYS.Columns order by Name"); //$NON-NLS-1$ //$NON-NLS-2$
+ checkResult("testColumns", "select* from SYS.Columns order by Name, uid"); //$NON-NLS-1$ //$NON-NLS-2$
}
+ */
@Test public void testTableType() throws Exception {
Modified: branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
===================================================================
--- branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java 2013-04-17 18:21:23 UTC (rev 4560)
+++ branches/7.7.x-TEIID-2429/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java 2013-04-17 18:59:47 UTC (rev 4561)
@@ -34,15 +34,18 @@
import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.XMLStreamWriter;
+import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.teiid.api.exception.query.QueryParserException;
@@ -67,14 +70,7 @@
import org.teiid.query.processor.ProcessorDataManager;
import org.teiid.query.processor.ProcessorPlan;
import org.teiid.query.processor.TestTextTable;
-import org.teiid.query.processor.relational.BlockingFakeRelationalNode;
-import org.teiid.query.processor.relational.EnhancedSortMergeJoinStrategy;
-import org.teiid.query.processor.relational.FakeRelationalNode;
-import org.teiid.query.processor.relational.JoinNode;
-import org.teiid.query.processor.relational.JoinStrategy;
-import org.teiid.query.processor.relational.MergeJoinStrategy;
-import org.teiid.query.processor.relational.RelationalNode;
-import org.teiid.query.processor.relational.SortNode;
+import org.teiid.query.processor.relational.*;
import org.teiid.query.processor.relational.MergeJoinStrategy.SortOption;
import org.teiid.query.processor.relational.SortUtility.Mode;
import org.teiid.query.sql.lang.Command;
@@ -89,7 +85,10 @@
@SuppressWarnings("nls")
public class TestEnginePerformance {
+ private static boolean debug = false;
+
private static BufferManagerImpl bm;
+ private static BufferFrontedFileStoreCache cache;
private static ExecutorService es;
private static Random r = new Random(0);
@@ -146,9 +145,9 @@
});
}
- es.invokeAll(tasks);
- for (Callable<Void> callable : tasks) {
- callable.call();
+ List<Future<Void>> result = es.invokeAll(tasks);
+ for (Future<Void> future : result) {
+ future.get();
}
}
@@ -198,7 +197,7 @@
for (int i = 0; i < rowCount; i++) {
data[i] = Arrays.asList(i, String.valueOf(i));
}
- //Collections.shuffle(Arrays.asList(data), r);
+ Collections.shuffle(Arrays.asList(data), r);
return data;
}
@@ -267,11 +266,11 @@
bm = new BufferManagerImpl();
bm.setMaxProcessingKB(1<<12);
- bm.setMaxReserveKB((1<<19)-(1<<17));
+ bm.setMaxReserveKB((1<<18)-(1<<16));
bm.setMaxActivePlans(20);
- BufferFrontedFileStoreCache cache = new BufferFrontedFileStoreCache();
- cache.setMemoryBufferSpace(1<<27);
+ cache = new BufferFrontedFileStoreCache();
+ cache.setMemoryBufferSpace(1<<26);
FileStorageManager fsm = new FileStorageManager();
fsm.setStorageDirectory(UnitTestUtil.getTestScratchPath() + "/data");
cache.setStorageManager(fsm);
@@ -282,6 +281,12 @@
es = Executors.newCachedThreadPool();
}
+ @After public void tearDown() throws Exception {
+ if (debug) {
+ showStats();
+ }
+ }
+
private void helpTestXMLTable(int iterations, int threadCount, String file, int expectedRowCount) throws QueryParserException,
TeiidException, InterruptedException, Exception {
String sql = "select * from xmltable('/root/child' passing xmlparse(document cast(? as clob) wellformed) columns x integer path '@id', y long path 'gc2') as x"; //$NON-NLS-1$
@@ -455,6 +460,97 @@
});
}
+ private void helpTestLargeSort(int iterations, int threads, final int rows) throws InterruptedException, Exception {
+ final List<ElementSymbol> elems = new ArrayList<ElementSymbol>();
+ final int cols = 50;
+ for (int i = 0; i < cols; i++) {
+ ElementSymbol elem1 = new ElementSymbol("e" + i);
+ elem1.setType(DataTypeManager.DefaultDataClasses.STRING);
+ elems.add(elem1);
+ }
+
+ final List<ElementSymbol> sortElements = Arrays.asList(elems.get(0));
+
+ final Task task = new Task() {
+ @Override
+ public Void call() throws Exception {
+ CommandContext context = new CommandContext ("pid", "test", null, null, 1); //$NON-NLS-1$ //$NON-NLS-2$
+ SortNode sortNode = new SortNode(1);
+ sortNode.setSortElements(new OrderBy(sortElements).getOrderByItems());
+ sortNode.setMode(Mode.SORT);
+ sortNode.setElements(elems);
+ RelationalNode rn = new RelationalNode(2) {
+ int blockingPeriod = 3;
+ int count = 0;
+ int batches = 0;
+
+ @Override
+ protected TupleBatch nextBatchDirect() throws BlockedException,
+ TeiidComponentException, TeiidProcessingException {
+ if (count++%blockingPeriod==0) {
+ throw BlockedException.INSTANCE;
+ }
+ int batchSize = this.getBatchSize();
+ int batchRows = batchSize;
+ boolean done = false;
+ int start = batches++ * batchSize;
+ if (start + batchSize >= rows) {
+ done = true;
+ batchRows = rows - start;
+ }
+ ArrayList<List<?>> batch = new ArrayList<List<?>>(batchRows);
+ for (int i = 0; i < batchRows; i++) {
+ ArrayList<Object> row = new ArrayList<Object>();
+ for (int j = 0; j < cols; j++) {
+ if (j == 0) {
+ row.add(String.valueOf((i * 279470273) % 4294967291l));
+ } else {
+ row.add(i + "abcdefghijklmnop" + j);
+ }
+ }
+ batch.add(row);
+ }
+ TupleBatch result = new TupleBatch(start+1, batch);
+ if (done) {
+ result.setTerminationFlag(true);
+ }
+ return result;
+ }
+
+ @Override
+ public Object clone() {
+ return null;
+ }
+ };
+ rn.setElements(elems);
+ sortNode.addChild(rn);
+ sortNode.initialize(context, bm, null);
+ rn.initialize(context, bm, null);
+ process(sortNode, rows);
+ return null;
+ }
+ };
+ runTask(iterations, threads, task);
+ }
+
+ @Test public void runWideSort_1_100000() throws Exception {
+ helpTestLargeSort(4, 1, 100000);
+ }
+
+ @Test public void runWideSort_4_100000() throws Exception {
+ helpTestLargeSort(2, 4, 100000);
+ }
+
+ private static void showStats() {
+ System.out.println(bm.getBatchesAdded());
+ System.out.println(bm.getReferenceHits());
+ System.out.println(bm.getReadAttempts());
+ System.out.println(bm.getReadCount());
+ System.out.println(bm.getWriteCount());
+ System.out.println(cache.getStorageReads());
+ System.out.println(cache.getStorageWrites());
+ }
+
/**
* Generates a 5 MB document
*/
11 years, 8 months
teiid SVN: r4560 - branches.
by teiid-commits@lists.jboss.org
Author: jolee
Date: 2013-04-17 14:21:23 -0400 (Wed, 17 Apr 2013)
New Revision: 4560
Added:
branches/7.7.x-TEIID-2429/
Log:
7.7.x-TEIID-2429
11 years, 8 months
[teiid/teiid] 9bc05d: TEIID-2470 allowing embedded to support zip deploy...
by shawkins
Branch: refs/heads/master
Home: https://github.com/teiid/teiid
Commit: 9bc05dad04de41812d6f4d3038009faa46ddf9e8
https://github.com/teiid/teiid/commit/9bc05dad04de41812d6f4d3038009faa46d...
Author: shawkins <shawkins(a)redhat.com>
Date: 2013-04-17 (Wed, 17 Apr 2013)
Changed paths:
M build/kits/jboss-as7/docs/teiid/teiid-releasenotes.html
A engine/src/main/java/org/teiid/query/metadata/PureZipFileSystem.java
M jboss-integration/src/main/java/org/teiid/jboss/VDBService.java
M metadata/src/test/java/org/teiid/metadata/index/VDBMetadataFactory.java
M runtime/src/main/java/org/teiid/runtime/AbstractVDBDeployer.java
M runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
M runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java
M test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log Message:
-----------
TEIID-2470 allowing embedded to support zip deployments
11 years, 8 months
[teiid/teiid] 630e27: TEIID-2467 removing the rest of the processing tha...
by shawkins
Branch: refs/heads/master
Home: https://github.com/teiid/teiid
Commit: 630e2736c504509441cb0910b5b52319df4d3fca
https://github.com/teiid/teiid/commit/630e2736c504509441cb0910b5b52319df4...
Author: shawkins <shawkins(a)redhat.com>
Date: 2013-04-13 (Sat, 13 Apr 2013)
Changed paths:
M api/src/main/java/org/teiid/translator/ExecutionFactory.java
M engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
M engine/src/main/java/org/teiid/dqp/internal/process/QueryProcessorFactoryImpl.java
M engine/src/main/java/org/teiid/dqp/internal/process/Request.java
R engine/src/main/java/org/teiid/query/eval/SecurityFunctionEvaluator.java
M engine/src/main/java/org/teiid/query/function/source/SecuritySystemFunctions.java
M engine/src/main/java/org/teiid/query/optimizer/QueryOptimizer.java
M engine/src/main/java/org/teiid/query/tempdata/TempTableDataManager.java
M engine/src/main/java/org/teiid/query/util/CommandContext.java
M engine/src/test/java/org/teiid/query/processor/TestSecurityFunctions.java
M test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViews.java
Log Message:
-----------
TEIID-2467 removing the rest of the processing that holds a thread
11 years, 8 months