Author: jolee
Date: 2013-02-27 15:22:37 -0500 (Wed, 27 Feb 2013)
New Revision: 4555
Modified:
branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-2410: issues with output buffer blocking
Modified:
branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
===================================================================
---
branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java 2013-02-27
17:16:02 UTC (rev 4554)
+++
branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java 2013-02-27
20:22:37 UTC (rev 4555)
@@ -174,8 +174,16 @@
};
public static final int MAX_STRING_LENGTH = 4000;
- public static final int MAX_LOB_MEMORY_BYTES = 1 << 13;
+ public static final int MAX_LOB_MEMORY_BYTES = Math.max(nextPowOf2(2*MAX_STRING_LENGTH),
1<<13);
+ public static int nextPowOf2(int val) {
+ int result = 1;
+ while (result < val) {
+ result <<= 1;
+ }
+ return result;
+ }
+
public static final class DataTypeAliases {
public static final String VARCHAR = "varchar"; //$NON-NLS-1$
public static final String TINYINT = "tinyint"; //$NON-NLS-1$
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-02-27
17:16:02 UTC (rev 4554)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2013-02-27
20:22:37 UTC (rev 4555)
@@ -114,7 +114,7 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run",
impl.reserveBatchBytes.get(), impl.maxReserveBytes.get(), impl.activeBatchBytes.get());
//$NON-NLS-1$
}
- impl.doEvictions(0, false);
+ impl.doEvictions(0, !agingOut);
if (!agingOut) {
try {
Thread.sleep(100); //we don't want to evict too fast, because the processing
threads are more than capable of evicting
@@ -691,13 +691,13 @@
}
return;
}
- long maxToFree = Math.max(maxProcessingBytes>>1, reserveBatch>>3);
+ long maxToFree = Math.min(maxProcessingBytes, (activeBatch - reserveBatch)<<1);
doEvictions(maxToFree, true);
}
void doEvictions(long maxToFree, boolean checkActiveBatch) {
int freed = 0;
- while (freed <= maxToFree && (!checkActiveBatch || activeBatchBytes.get()
> reserveBatchBytes.get() * .8)) {
+ while (freed <= maxToFree && (!checkActiveBatch || (maxToFree == 0
&& activeBatchBytes.get() > reserveBatchBytes.get() * .7) || (maxToFree > 0
&& activeBatchBytes.get() > reserveBatchBytes.get() * .8))) {
CacheEntry ce = evictionQueue.firstEntry(true);
if (ce == null) {
break;
@@ -715,9 +715,14 @@
} finally {
synchronized (ce) {
if (evicted && memoryEntries.remove(ce.getId()) != null) {
- freed += ce.getSizeEstimate();
+ if (maxToFree > 0) {
+ freed += ce.getSizeEstimate();
+ }
activeBatchBytes.addAndGet(-ce.getSizeEstimate());
evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
+ if (!checkActiveBatch) {
+ break;
+ }
}
}
}
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2013-02-27
17:16:02 UTC (rev 4554)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2013-02-27
20:22:37 UTC (rev 4555)
@@ -47,7 +47,7 @@
private static Map<Class<?>, int[]> SIZE_ESTIMATES = new
HashMap<Class<?>, int[]>(128);
private static Set<Class<?>> VARIABLE_SIZE_TYPES = new
HashSet<Class<?>>();
static {
- SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.STRING, new int[] {100, 256});
+ SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.STRING, new int[] {100,
Math.max(100, DataTypeManager.nextPowOf2(DataTypeManager.MAX_STRING_LENGTH/16))});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.DATE, new int[] {20, 28});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIME, new int[] {20, 28});
SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIMESTAMP, new int[] {20, 28});
Modified: branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2013-02-27
17:16:02 UTC (rev 4554)
+++
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2013-02-27
20:22:37 UTC (rev 4555)
@@ -431,15 +431,18 @@
}
}
- public boolean hasWaitingPlans(RequestWorkItem item) {
- synchronized (waitingPlans) {
- if (!waitingPlans.isEmpty()) {
- return true;
- }
- this.bufferFullPlans.add(item);
- }
- return false;
- }
+ public boolean blockOnOutputBuffer(RequestWorkItem item) {
+ synchronized (waitingPlans) {
+ if (!waitingPlans.isEmpty()) {
+ return false;
+ }
+ if (item.useCallingThread || item.getDqpWorkContext().getSession().isEmbedded())
{
+ return false;
+ }
+ this.bufferFullPlans.add(item);
+ }
+ return true;
+ }
void removeRequest(final RequestWorkItem workItem) {
finishProcessing(workItem);
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2013-02-27
17:16:02 UTC (rev 4554)
+++
branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2013-02-27
20:22:37 UTC (rev 4555)
@@ -550,11 +550,12 @@
super.flushBatchDirect(batch, add);
if (!add && !processor.hasFinalBuffer()) {
resultsBuffer.setRowCount(batch.getEndRow());
- } else if (!processor.hasFinalBuffer() //restrict the buffer size for forward only
results
+ } else if (isForwardOnly() && add
+ && !processor.hasFinalBuffer() //restrict the buffer size for forward only
results
&& !batch.getTerminationFlag()
&& transactionState != TransactionState.ACTIVE
- && this.getTupleBuffer().getManagedRowCount() >=
OUTPUT_BUFFER_MAX_BATCHES * this.getTupleBuffer().getBatchSize()) {
- if (!dqpCore.hasWaitingPlans(RequestWorkItem.this)) {
+ && resultsBuffer.getManagedRowCount() >= OUTPUT_BUFFER_MAX_BATCHES *
resultsBuffer.getBatchSize()) {
+ if (dqpCore.blockOnOutputBuffer(RequestWorkItem.this)) {
//requestMore will trigger more processing
throw BlockedException.block(requestID, "Blocking due to full results
TupleBuffer", //$NON-NLS-1$
this.getTupleBuffer().getId(), "rows",
this.getTupleBuffer().getManagedRowCount(), "batch size",
this.getTupleBuffer().getBatchSize()); //$NON-NLS-1$ //$NON-NLS-2$
Modified:
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
---
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2013-02-27
17:16:02 UTC (rev 4554)
+++
branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2013-02-27
20:22:37 UTC (rev 4555)
@@ -345,8 +345,30 @@
break;
}
}
+
+ //insensitive should not block
+ reqMsg.setCursorType(ResultSet.TYPE_SCROLL_INSENSITIVE);
+
+ message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+ rm = message.get(500000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+
+ assertEquals(rowsPerBatch, rm.getResultsList().size());
+ item =
core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+
+ message = core.processCursorRequest(reqMsg.getExecutionId(), 9, rowsPerBatch);
+ rm = message.get(500000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+ assertEquals(rowsPerBatch, rm.getResultsList().size());
+ //ensure that we are idle
+ for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++)
{
+ Thread.sleep(100);
+ }
+ assertEquals(ThreadState.IDLE, item.getThreadState());
+ assertEquals(item.resultsBuffer.getManagedRowCount(), 400); //should have the full
results
}
+
@Test public void testBufferReuse() throws Exception {
//the sql should return 100 rows
String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B ORDER
BY A.IntKey"; //$NON-NLS-1$