[teiid-commits] teiid SVN: r2936 - in trunk/engine/src: main/java/org/teiid/dqp/internal/process and 1 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Feb 23 23:55:54 EST 2011
Author: shawkins
Date: 2011-02-23 23:55:53 -0500 (Wed, 23 Feb 2011)
New Revision: 2936
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-1463 limiting forward only cursors to a 20 batch buffer.
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-02-23 21:50:25 UTC (rev 2935)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-02-24 04:55:53 UTC (rev 2936)
@@ -245,6 +245,24 @@
}
}
+ /**
+ * Returns the total number of rows contained in managed batches
+ * @return
+ */
+ public int getManagedRowCount() {
+ if (!this.batches.isEmpty()) {
+ int start = this.batches.firstKey();
+ return rowCount - start + 1;
+ } else if (this.batchBuffer != null) {
+ return this.batchBuffer.size();
+ }
+ return 0;
+ }
+
+ /**
+ * Returns the last row number
+ * @return
+ */
public int getRowCount() {
return rowCount;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-02-23 21:50:25 UTC (rev 2935)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-02-24 04:55:53 UTC (rev 2936)
@@ -416,6 +416,13 @@
add = sendResultsIfNeeded(batch);
if (!added) {
super.flushBatchDirect(batch, add);
+ //restrict the buffer size for forward only results
+ if (add
+ && !batch.getTerminationFlag()
+ && this.getTupleBuffer().getManagedRowCount() >= 20 * this.getTupleBuffer().getBatchSize()) {
+ //requestMore will trigger more processing
+ throw BlockedException.INSTANCE;
+ }
}
}
};
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-02-23 21:50:25 UTC (rev 2935)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-02-24 04:55:53 UTC (rev 2936)
@@ -38,8 +38,10 @@
import org.teiid.cache.DefaultCacheFactory;
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
+import org.teiid.common.buffer.impl.BufferManagerImpl;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.FakeTransactionService;
+import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
import org.teiid.dqp.service.AutoGenDataService;
import org.teiid.dqp.service.FakeBufferService;
import org.teiid.query.unittest.FakeMetadataFactory;
@@ -181,6 +183,44 @@
@Test public void testCancel() throws Exception {
assertFalse(this.core.cancelRequest(1L));
}
+
+ @Test public void testBufferLimit() throws Exception {
+ //the sql should return 100 rows
+ String sql = "SELECT A.IntKey FROM BQT1.SmallA as A, BQT1.SmallA as B"; //$NON-NLS-1$
+ String userName = "1"; //$NON-NLS-1$
+ String sessionid = "1"; //$NON-NLS-1$
+
+ RequestMessage reqMsg = exampleRequestMessage(sql);
+ reqMsg.setCursorType(ResultSet.TYPE_FORWARD_ONLY);
+ DQPWorkContext.getWorkContext().getSession().setSessionId(sessionid);
+ DQPWorkContext.getWorkContext().getSession().setUserName(userName);
+ ((BufferManagerImpl)core.getBufferManager()).setProcessorBatchSize(2);
+ Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+ ResultsMessage rm = message.get(5000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+ assertEquals(2, rm.getResults().length);
+ RequestWorkItem item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+
+ message = core.processCursorRequest(reqMsg.getExecutionId(), 3, 2);
+ rm = message.get(5000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+ assertEquals(2, rm.getResults().length);
+ //ensure that we are idle
+ for (int i = 0; i < 10 && item.getThreadState() != ThreadState.IDLE; i++) {
+ Thread.sleep(100);
+ }
+ assertEquals(ThreadState.IDLE, item.getThreadState());
+ assertEquals(46, item.resultsBuffer.getRowCount());
+ //pull the rest of the results
+ for (int j = 0; j < 48; j++) {
+ item = core.getRequestWorkItem(DQPWorkContext.getWorkContext().getRequestID(reqMsg.getExecutionId()));
+
+ message = core.processCursorRequest(reqMsg.getExecutionId(), j * 2 + 5, 2);
+ rm = message.get(5000, TimeUnit.MILLISECONDS);
+ assertNull(rm.getException());
+ assertEquals(2, rm.getResults().length);
+ }
+ }
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
More information about the teiid-commits
mailing list