[teiid-commits] teiid SVN: r4555 - in branches/7.7.x: engine/src/main/java/org/teiid/common/buffer/impl and 2 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Feb 27 15:22:38 EST 2013


Author: jolee
Date: 2013-02-27 15:22:37 -0500 (Wed, 27 Feb 2013)
New Revision: 4555

Modified:
   branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
   branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
   branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-2410: issues with output buffer blocking

Modified: branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java
===================================================================
--- branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java	2013-02-27 17:16:02 UTC (rev 4554)
+++ branches/7.7.x/common-core/src/main/java/org/teiid/core/types/DataTypeManager.java	2013-02-27 20:22:37 UTC (rev 4555)
@@ -174,8 +174,16 @@
 	};
 
 	public static final int MAX_STRING_LENGTH = 4000;
-	public static final int MAX_LOB_MEMORY_BYTES = 1 << 13;
+	public static final int MAX_LOB_MEMORY_BYTES = Math.max(nextPowOf2(2*MAX_STRING_LENGTH), 1<<13);
 	
+	public static int nextPowOf2(int val) {
+		int result = 1;
+        while (result < val) {
+        	result <<= 1;
+        }
+        return result;
+	}
+	
 	public static final class DataTypeAliases {
 		public static final String VARCHAR = "varchar"; //$NON-NLS-1$
 		public static final String TINYINT = "tinyint"; //$NON-NLS-1$

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-02-27 17:16:02 UTC (rev 4554)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2013-02-27 20:22:37 UTC (rev 4555)
@@ -114,7 +114,7 @@
 				if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 					LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Asynch eviction run", impl.reserveBatchBytes.get(), impl.maxReserveBytes.get(), impl.activeBatchBytes.get()); //$NON-NLS-1$
 				}
-				impl.doEvictions(0, false);
+				impl.doEvictions(0, !agingOut);
 				if (!agingOut) {
 					try {
 						Thread.sleep(100); //we don't want to evict too fast, because the processing threads are more than capable of evicting
@@ -691,13 +691,13 @@
 			}
 			return;
 		}
-		long maxToFree = Math.max(maxProcessingBytes>>1, reserveBatch>>3);
+		long maxToFree = Math.min(maxProcessingBytes, (activeBatch - reserveBatch)<<1);
 		doEvictions(maxToFree, true);
 	}
 
 	void doEvictions(long maxToFree, boolean checkActiveBatch) {
 		int freed = 0;
-		while (freed <= maxToFree && (!checkActiveBatch || activeBatchBytes.get() > reserveBatchBytes.get() * .8)) {
+		while (freed <= maxToFree && (!checkActiveBatch || (maxToFree == 0 && activeBatchBytes.get() > reserveBatchBytes.get() * .7) || (maxToFree > 0 && activeBatchBytes.get() > reserveBatchBytes.get() * .8))) {
 			CacheEntry ce = evictionQueue.firstEntry(true);
 			if (ce == null) {
 				break;
@@ -715,9 +715,14 @@
 			} finally {
 				synchronized (ce) {
 					if (evicted && memoryEntries.remove(ce.getId()) != null) {
-						freed += ce.getSizeEstimate();
+						if (maxToFree > 0) {
+							freed += ce.getSizeEstimate();
+						}
 						activeBatchBytes.addAndGet(-ce.getSizeEstimate());
 						evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
+						if (!checkActiveBatch) {
+							break;
+						}
 					}
 				}
 			}

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2013-02-27 17:16:02 UTC (rev 4554)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2013-02-27 20:22:37 UTC (rev 4555)
@@ -47,7 +47,7 @@
 	private static Map<Class<?>, int[]> SIZE_ESTIMATES = new HashMap<Class<?>, int[]>(128);
 	private static Set<Class<?>> VARIABLE_SIZE_TYPES = new HashSet<Class<?>>();
 	static {
-		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.STRING, new int[] {100, 256});
+		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.STRING, new int[] {100, Math.max(100, DataTypeManager.nextPowOf2(DataTypeManager.MAX_STRING_LENGTH/16))});
 		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.DATE, new int[] {20, 28});
 		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIME, new int[] {20, 28});
 		SIZE_ESTIMATES.put(DataTypeManager.DefaultDataClasses.TIMESTAMP, new int[] {20, 28});

Modified: branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2013-02-27 17:16:02 UTC (rev 4554)
+++ branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java	2013-02-27 20:22:37 UTC (rev 4555)
@@ -431,15 +431,18 @@
 		}
     }
     
-    public boolean hasWaitingPlans(RequestWorkItem item) {
-    	synchronized (waitingPlans) {
-    		if (!waitingPlans.isEmpty()) {
-    			return true;
-    		}
-    		this.bufferFullPlans.add(item);
-		}
-    	return false;
-    }
+    public boolean blockOnOutputBuffer(RequestWorkItem item) {
+     	synchronized (waitingPlans) {
+     		if (!waitingPlans.isEmpty()) {
+     			return false;
+     		}
+     		if (item.useCallingThread || item.getDqpWorkContext().getSession().isEmbedded()) {
+     			return false;
+     		}
+     		this.bufferFullPlans.add(item);
+ 		}
+     	return true;
+     }
     
     void removeRequest(final RequestWorkItem workItem) {
     	finishProcessing(workItem);

Modified: branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2013-02-27 17:16:02 UTC (rev 4554)
+++ branches/7.7.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2013-02-27 20:22:37 UTC (rev 4555)
@@ -550,11 +550,12 @@
 					super.flushBatchDirect(batch, add);
 					if (!add && !processor.hasFinalBuffer()) {
 						 resultsBuffer.setRowCount(batch.getEndRow());
-					} else if (!processor.hasFinalBuffer() //restrict the buffer size for forward only results
+					} else if (isForwardOnly() && add 
+							&& !processor.hasFinalBuffer() //restrict the buffer size for forward only results
 							&& !batch.getTerminationFlag() 
 							&& transactionState != TransactionState.ACTIVE
-							&& this.getTupleBuffer().getManagedRowCount() >= OUTPUT_BUFFER_MAX_BATCHES * this.getTupleBuffer().getBatchSize()) {
-						if (!dqpCore.hasWaitingPlans(RequestWorkItem.this)) {
+							&& resultsBuffer.getManagedRowCount() >= OUTPUT_BUFFER_MAX_BATCHES * resultsBuffer.getBatchSize()) {
+						if (dqpCore.blockOnOutputBuffer(RequestWorkItem.this)) {
 							//requestMore will trigger more processing
 							throw BlockedException.block(requestID, "Blocking due to full results TupleBuffer", //$NON-NLS-1$
 									this.getTupleBuffer().getId(), "rows", this.getTupleBuffer().getManagedRowCount(), "batch size", this.getTupleBuffer().getBatchSize()); //$NON-NLS-1$ //$NON-NLS-2$ 

Modified: branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2013-02-27 17:16:02 UTC (rev 4554)
+++ branches/7.7.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2013-02-27 20:22:37 UTC (rev 4555)
@@ -345,8 +345,30 @@
         		break;
         	}
         }
+    
+	    //insensitive should not block
+	    reqMsg.setCursorType(ResultSet.TYPE_SCROLL_INSENSITIVE);
+	    
+	    message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+	    rm = message.get(500000, TimeUnit.MILLISECONDS);
+	    assertNull(rm.getException());
+	    		
+	    assertEquals(rowsPerBatch, rm.getResultsList().size());
+	    item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+	    		
+	    message = core.processCursorRequest(reqMsg.getExecutionId(), 9, rowsPerBatch);
+	    rm = message.get(500000, TimeUnit.MILLISECONDS);
+	    assertNull(rm.getException());
+	    assertEquals(rowsPerBatch, rm.getResultsList().size());
+	    //ensure that we are idle
+	    for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
+	    	Thread.sleep(100);
+	    }
+	    assertEquals(ThreadState.IDLE, item.getThreadState());
+	    assertEquals(item.resultsBuffer.getManagedRowCount(), 400); //should have the full results
     }
     
+    
     @Test public void testBufferReuse() throws Exception {
     	//the sql should return 100 rows
         String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B ORDER BY A.IntKey"; //$NON-NLS-1$



More information about the teiid-commits mailing list