[teiid-commits] teiid SVN: r2936 - in trunk/engine/src: main/java/org/teiid/dqp/internal/process and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Feb 23 23:55:54 EST 2011


Author: shawkins
Date: 2011-02-23 23:55:53 -0500 (Wed, 23 Feb 2011)
New Revision: 2936

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-1463 limiting forward only cursors to a 20 batch buffer.  

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-02-23 21:50:25 UTC (rev 2935)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-02-24 04:55:53 UTC (rev 2936)
@@ -245,6 +245,24 @@
 		}
 	}
 	
+	/**
+	 * Returns the total number of rows contained in managed batches
+	 * @return
+	 */
+	public int getManagedRowCount() {
+		if (!this.batches.isEmpty()) {
+			int start = this.batches.firstKey();
+			return rowCount - start + 1;
+		} else if (this.batchBuffer != null) {
+			return this.batchBuffer.size();
+		} 
+		return 0;
+	}
+	
+	/**
+	 * Returns the last row number
+	 * @return
+	 */
 	public int getRowCount() {
 		return rowCount;
 	}

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-02-23 21:50:25 UTC (rev 2935)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-02-24 04:55:53 UTC (rev 2936)
@@ -416,6 +416,13 @@
 				add = sendResultsIfNeeded(batch);
 				if (!added) {
 					super.flushBatchDirect(batch, add);
+					//restrict the buffer size for forward only results
+					if (add 
+							&& !batch.getTerminationFlag() 
+							&& this.getTupleBuffer().getManagedRowCount() >= 20 * this.getTupleBuffer().getBatchSize()) {
+						//requestMore will trigger more processing
+						throw BlockedException.INSTANCE;
+					}
 				}
 			}
 		};

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-02-23 21:50:25 UTC (rev 2935)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-02-24 04:55:53 UTC (rev 2936)
@@ -38,8 +38,10 @@
 import org.teiid.cache.DefaultCacheFactory;
 import org.teiid.client.RequestMessage;
 import org.teiid.client.ResultsMessage;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
 import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
 import org.teiid.dqp.internal.datamgr.FakeTransactionService;
+import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
 import org.teiid.dqp.service.AutoGenDataService;
 import org.teiid.dqp.service.FakeBufferService;
 import org.teiid.query.unittest.FakeMetadataFactory;
@@ -181,6 +183,44 @@
 	@Test public void testCancel() throws Exception {
 		assertFalse(this.core.cancelRequest(1L));
 	}
+	
+    @Test public void testBufferLimit() throws Exception {
+    	//the sql should return 100 rows
+        String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B"; //$NON-NLS-1$
+        String userName = "1"; //$NON-NLS-1$
+        String sessionid = "1"; //$NON-NLS-1$
+        
+        RequestMessage reqMsg = exampleRequestMessage(sql);
+        reqMsg.setCursorType(ResultSet.TYPE_FORWARD_ONLY);
+        DQPWorkContext.getWorkContext().getSession().setSessionId(sessionid);
+        DQPWorkContext.getWorkContext().getSession().setUserName(userName);
+        ((BufferManagerImpl)core.getBufferManager()).setProcessorBatchSize(2);
+        Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+        ResultsMessage rm = message.get(5000, TimeUnit.MILLISECONDS);
+        assertNull(rm.getException());
+        assertEquals(2, rm.getResults().length);
+        RequestWorkItem item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+
+        message = core.processCursorRequest(reqMsg.getExecutionId(), 3, 2);
+        rm = message.get(5000, TimeUnit.MILLISECONDS);
+        assertNull(rm.getException());
+        assertEquals(2, rm.getResults().length);
+        //ensure that we are idle
+        for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
+        	Thread.sleep(100);
+        }
+        assertEquals(ThreadState.IDLE, item.getThreadState());
+        assertEquals(46, item.resultsBuffer.getRowCount());
+        //pull the rest of the results
+        for (int j = 0; j < 48; j++) {
+            item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+
+	        message = core.processCursorRequest(reqMsg.getExecutionId(), j * 2 + 5, 2);
+	        rm = message.get(5000, TimeUnit.MILLISECONDS);
+	        assertNull(rm.getException());
+	        assertEquals(2, rm.getResults().length);
+        }
+    }
     
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 



More information about the teiid-commits mailing list