[teiid-commits] teiid SVN: r1239 - in trunk: client-jdbc/src/main/java/com/metamatrix/jdbc and 8 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Aug 12 23:32:15 EDT 2009
Author: shawkins
Date: 2009-08-12 23:32:14 -0400 (Wed, 12 Aug 2009)
New Revision: 1239
Modified:
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java
trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java
trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-775 TEIID-767 changing cursor requests to use begin row and fetchsize, simplifying the buffermanager to not reshape pinned batches or require an endrow.
Modified: trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java
===================================================================
--- trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client/src/main/java/com/metamatrix/dqp/client/ClientSideDQP.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -39,7 +39,7 @@
ResultsFuture<ResultsMessage> executeRequest(long reqID, RequestMessage message) throws MetaMatrixProcessingException, MetaMatrixComponentException;
- ResultsFuture<ResultsMessage> processCursorRequest(long reqID, int batchFirst, int batchLast) throws MetaMatrixProcessingException;
+ ResultsFuture<ResultsMessage> processCursorRequest(long reqID, int batchFirst, int fetchSize) throws MetaMatrixProcessingException;
ResultsFuture<?> closeRequest(long requestID) throws MetaMatrixProcessingException, MetaMatrixComponentException;
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchFetcher.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -30,5 +30,5 @@
* @since 4.3
*/
public interface BatchFetcher {
- Batch requestBatch(int beginRow, int endRow) throws SQLException;
+ Batch requestBatch(int beginRow) throws SQLException;
}
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/BatchResults.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -76,16 +76,13 @@
private int lastRowNumber = -1;
private int highestRowNumber;
private BatchFetcher batchFetcher;
- private int fetchSize;
- public BatchResults(List[] batch, int beginRow, int endRow, boolean isLast, int fetchSize) {
- this.fetchSize = fetchSize;
+ public BatchResults(List[] batch, int beginRow, int endRow, boolean isLast) {
this.setBatch(new Batch(batch, beginRow, endRow, isLast));
}
- public BatchResults(BatchFetcher batchFetcher, int fetchSize, Batch batch) {
+ public BatchResults(BatchFetcher batchFetcher, Batch batch) {
this.batchFetcher = batchFetcher;
- this.fetchSize = fetchSize;
this.setBatch(batch);
}
@@ -97,20 +94,12 @@
if (currentRowNumber == 0 || (lastRowNumber != -1 && currentRowNumber > lastRowNumber)) {
return null;
}
- int closestMax = currentRowNumber + fetchSize - 1;
- int closestMin = Math.max(1, currentRowNumber - fetchSize + 1);
for (int i = 0; i < batches.size(); i++) {
Batch batch = batches.get(i);
if (currentRowNumber < batch.getBeginRow()) {
- if (i != SAVED_BATCHES - 1) {
- closestMax = Math.min(batch.getBeginRow(), closestMax);
- }
continue;
}
if (currentRowNumber > batch.getEndRow()) {
- if (i != SAVED_BATCHES - 1) {
- closestMin = Math.max(batch.getEndRow(), closestMin);
- }
continue;
}
if (i != 0) {
@@ -118,17 +107,13 @@
}
return batch.getRow(currentRowNumber);
}
- if (closestMax - currentRowNumber >= currentRowNumber - closestMin) {
- requestBatchAndWait(currentRowNumber, closestMax);
- } else {
- requestBatchAndWait(currentRowNumber, closestMin);
- }
+ requestBatchAndWait(currentRowNumber);
Batch batch = batches.get(0);
return batch.getRow(currentRowNumber);
}
private void requestNextBatch() throws SQLException {
- requestBatchAndWait(highestRowNumber + 1, highestRowNumber + fetchSize);
+ requestBatchAndWait(highestRowNumber + 1);
}
public boolean next() throws SQLException{
@@ -165,10 +150,6 @@
this.batchFetcher = batchFetcher;
}
- public int getFetchSize() {
- return fetchSize;
- }
-
public boolean absolute(int row) throws SQLException {
return absolute(row, 0);
}
@@ -215,8 +196,8 @@
return currentRowNumber;
}
- private void requestBatchAndWait(int beginRow, int endRow) throws SQLException{
- setBatch(batchFetcher.requestBatch(beginRow, endRow));
+ private void requestBatchAndWait(int beginRow) throws SQLException{
+ setBatch(batchFetcher.requestBatch(beginRow));
}
private void setBatch(Batch batch) {
Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMResultSet.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -101,6 +101,7 @@
private Map updatedPlanDescription;
private ResultsMessage resultsMsg;
private int maxFieldSize;
+ private int fetchSize;
/**
* Constructor.
@@ -121,7 +122,7 @@
// server latency-related timestamp
this.processingTimestamp = resultsMsg.getProcessingTimestamp();
this.requestID = statement.getCurrentRequestID();
- this.batchResults = new BatchResults(this, resultsMsg.getFetchSize(), getCurrentBatch(resultsMsg));
+ this.batchResults = new BatchResults(this, getCurrentBatch(resultsMsg));
setResultsData(resultsMsg);
cursorType = statement.getResultSetType();
this.serverTimeZone = statement.getServerTimeZone();
@@ -141,6 +142,7 @@
if (this.parameters > 0) {
rmetadata = FilteredResultsMetadata.newInstance(rmetadata, resultColumns);
}
+ this.fetchSize = resultsMsg.getFetchSize();
}
public void setMaxFieldSize(int maxFieldSize) {
@@ -230,7 +232,7 @@
* have been reset by the server.
*/
public int getFetchSize() throws SQLException {
- return this.batchResults.getFetchSize();
+ return this.fetchSize;
}
/**
@@ -341,11 +343,11 @@
return updatedPlanDescription;
}
- public Batch requestBatch(int beginRow, int endRow) throws SQLException{
- logger.fine("CursorResultsImpl.requestBatch] thread name: " + Thread.currentThread().getName() + " requestID: " + requestID + " beginRow: " + beginRow + " endinRow: " + endRow ); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
+ public Batch requestBatch(int beginRow) throws SQLException{
+ logger.fine("CursorResultsImpl.requestBatch] thread name: " + Thread.currentThread().getName() + " requestID: " + requestID + " beginRow: " + beginRow ); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
checkClosed();
try {
- ResultsFuture<ResultsMessage> results = statement.getDQP().processCursorRequest(requestID, beginRow, endRow);
+ ResultsFuture<ResultsMessage> results = statement.getDQP().processCursorRequest(requestID, beginRow, fetchSize);
ResultsMessage currentResultMsg = results.get();
this.setResultsData(currentResultMsg);
this.updatedPlanDescription = currentResultMsg.getPlanDescription();
@@ -1516,7 +1518,16 @@
}
public void setFetchSize(int rows) throws SQLException {
- throw SqlUtil.createFeatureNotSupportedException();
+ checkClosed();
+ if ( rows < 0 ) {
+ throw new MMSQLException(JDBCPlugin.Util.getString("MMStatement.Invalid_fetch_size")); //$NON-NLS-1$
+ }
+ // sets the fetch size on this statement
+ if (rows == 0) {
+ this.fetchSize = BaseDataSource.DEFAULT_FETCH_SIZE;
+ } else {
+ this.fetchSize = rows;
+ }
}
public void updateArray(int columnIndex, Array x) throws SQLException {
@@ -1527,7 +1538,6 @@
throw SqlUtil.createFeatureNotSupportedException();
}
-
public void updateAsciiStream(int columnIndex, InputStream x, int length)
throws SQLException {
throw SqlUtil.createFeatureNotSupportedException();
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestAllResultsImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -783,13 +783,13 @@
//forward requests
ResultsFuture<ResultsMessage> nextBatch = mock(ResultsFuture.class);
stub(nextBatch.get()).toReturn(exampleResultsMsg4(i + 1, Math.min(batchLength, totalLength - i), fetchSize, i + batchLength >= totalLength));
- stub(dqp.processCursorRequest(REQUEST_ID, i + 1, i + fetchSize)).toReturn(nextBatch);
+ stub(dqp.processCursorRequest(REQUEST_ID, i + 1, fetchSize)).toReturn(nextBatch);
if (i + batchLength < totalLength) {
//backward requests
ResultsFuture<ResultsMessage> previousBatch = mock(ResultsFuture.class);
stub(previousBatch.get()).toReturn(exampleResultsMsg4(i - batchLength + 1, i, fetchSize, false));
- stub(dqp.processCursorRequest(REQUEST_ID, i, i - fetchSize + 1)).toReturn(previousBatch);
+ stub(dqp.processCursorRequest(REQUEST_ID, i, fetchSize)).toReturn(previousBatch);
}
}
Modified: trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java
===================================================================
--- trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/client-jdbc/src/test/java/com/metamatrix/jdbc/TestBatchResults.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -41,7 +41,7 @@
private int totalRows = 50;
private boolean throwException;
- List batchCalls = new ArrayList<int[]>();
+ List<Integer> batchCalls = new ArrayList<Integer>();
public MockBatchFetcher() {
@@ -51,12 +51,16 @@
this.totalRows = totalRows;
}
- public Batch requestBatch(int beginRow, int endRow) throws SQLException {
- batchCalls.add(new int[] {beginRow, endRow});
+ public Batch requestBatch(int beginRow) throws SQLException {
+ batchCalls.add(beginRow);
if (throwException) {
throw new SQLException();
}
boolean isLast = false;
+ int endRow = beginRow + 9;
+ if (beginRow%10==0) {
+ endRow = beginRow - 9;
+ }
if(beginRow > endRow) {
if(endRow < 1) {
endRow = 1;
@@ -98,14 +102,14 @@
public void testGetCurrentRow1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertNull(batchResults.getCurrentRow());
batchResults.next();
assertNull(batchResults.getCurrentRow());
}
public void testGetCurrentRow2() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true);
assertNull(batchResults.getCurrentRow());
batchResults.next();
List expectedResult = new ArrayList();
@@ -115,30 +119,30 @@
public void testHasNext1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.hasNext());
}
public void testHasNext2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertTrue(batchResults.hasNext());
}
public void testHasNext3() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, true);
assertTrue(batchResults.hasNext());
}
public void testNext1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.next());
}
public void testNext2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertTrue(batchResults.next());
List expectedResult = new ArrayList();
expectedResult.add(new Integer(1));
@@ -148,7 +152,7 @@
public void testNext3() throws Exception{
//one row batch, multiple batches
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertTrue(batchResults.next());
assertTrue(batchResults.next());
@@ -158,7 +162,7 @@
}
public void testNext4() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
int i;
for(i=0; i<10; i++) {
@@ -177,13 +181,13 @@
public void testHasPrevious1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.hasPrevious());
}
public void testHasPrevious2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertFalse(batchResults.hasPrevious());
batchResults.next();
assertFalse(batchResults.hasPrevious());
@@ -193,13 +197,13 @@
public void testPrevious1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.previous());
}
public void testPrevious2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertTrue(batchResults.next());
assertFalse(batchResults.previous());
List expectedResult = new ArrayList();
@@ -212,7 +216,7 @@
public void testPrevious3() throws Exception{
//one row batch, multiple batches
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertFalse(batchResults.previous());
assertTrue(batchResults.next());
@@ -232,7 +236,7 @@
}
public void testPrevious4() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
int i;
for(i=0; i<=10; i++) {
@@ -248,14 +252,14 @@
public void testAbsolute1() throws Exception{
//empty batch
- BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true, 100);
+ BatchResults batchResults = new BatchResults(createEmptyBatch(), 0, 0, true);
assertFalse(batchResults.absolute(0));
assertFalse(batchResults.absolute(1));
}
public void testAbsolute2() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertFalse(batchResults.absolute(0));
assertTrue(batchResults.absolute(1));
@@ -266,7 +270,7 @@
}
public void testAbsolute3() throws Exception{
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
batchResults.setBatchFetcher(new MockBatchFetcher(200));
assertFalse(batchResults.absolute(0));
assertTrue(batchResults.absolute(11));
@@ -286,7 +290,7 @@
//move backwards with absolute
public void testAbsolute4() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertTrue(batchResults.absolute(10));
assertTrue(batchResults.absolute(2));
@@ -297,7 +301,7 @@
public void testAbsolute5() throws Exception{
//one row batch
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
batchResults.setBatchFetcher(new MockBatchFetcher());
assertTrue(batchResults.absolute(-1));
List expectedResult = new ArrayList();
@@ -308,7 +312,7 @@
}
public void testCurrentRowNumber() throws Exception {
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, true);
assertEquals(0, batchResults.getCurrentRowNumber());
batchResults.next();
assertEquals(1, batchResults.getCurrentRowNumber());
@@ -319,7 +323,7 @@
}
public void testSetException() throws Exception {
- BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false, 100);
+ BatchResults batchResults = new BatchResults(createBatch(1, 1), 1, 1, false);
MockBatchFetcher batchFetcher = new MockBatchFetcher();
batchResults.setBatchFetcher(batchFetcher);
batchFetcher.throwException();
@@ -332,7 +336,7 @@
}
public void testBatching() throws Exception {
- BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false, 10);
+ BatchResults batchResults = new BatchResults(createBatch(1, 10), 1, 10, false);
MockBatchFetcher batchFetcher = new MockBatchFetcher(60);
batchResults.setBatchFetcher(batchFetcher);
@@ -346,30 +350,29 @@
}
// verify batch calls
- checkResults(new int[][] {
+ checkResults(new int[] {
// going forwards - end > begin
- new int[] { 11, 20 },
- new int[] { 21, 30 },
- new int[] { 31, 40 },
- new int[] { 41, 50 },
+ 11,
+ 21,
+ 31,
+ 41,
// going backwards - begin > end
// last 3 batches were saved, only need the first 2 again
- new int[] { 20, 11 },
- new int[] { 10, 1 },
+ 20,
+ 10,
}, batchFetcher.batchCalls);
assertTrue(batchResults.absolute(50));
assertEquals(new Integer(50), batchResults.getCurrentRow().get(0));
}
- private void checkResults(int[][] expectedCalls, List<int[]> batchCalls) {
+ private void checkResults(int[] expectedCalls, List<Integer> batchCalls) {
assertEquals(expectedCalls.length, batchCalls.size());
for(int i=0; i<batchCalls.size(); i++) {
- int[] range = batchCalls.get(i);
- int[] expected = expectedCalls[i];
- assertEquals("On call " + i + " expected different begin", expected[0], range[0]);
- assertEquals("On call " + i + " expected different end", expected[1], range[1]);
+ int range = batchCalls.get(i);
+ int expected = expectedCalls[i];
+ assertEquals("On call " + i + " expected different begin", expected, range);
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -147,26 +147,24 @@
*
* @param tupleSourceID Tuple source identifier
* @param beginRow First row index to return
- * @param maxEndRow Maximum last row index to return, may be less actually returned
* @return Batch of rows starting from beginRow and not past maxEndRow
* @throws TupleSourceNotFoundException if tuple source could not be found
* @throws MetaMatrixComponentException indicating a non-business-related
* exception (such as a communication exception)
* @throws MemoryNotAvailableException If memory was not available for the pin
*/
- TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow, int maxEndRow)
+ TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow)
throws TupleSourceNotFoundException, MemoryNotAvailableException, MetaMatrixComponentException;
/**
* Unpins a range of rows from the given tuple source
* @param tupleSourceID Tuple source identifier
* @param firstRow First row to unpin
- * @param lastRow Last row to unpin (inclusive)
* @throws TupleSourceNotFoundException if tuple source could not be found
* @throws MetaMatrixComponentException indicating a non-business-related
* exception (such as a communication exception)
*/
- void unpinTupleBatch(TupleSourceID tupleSourceID, int firstRow, int lastRow)
+ void unpinTupleBatch(TupleSourceID tupleSourceID, int firstRow)
throws TupleSourceNotFoundException, MetaMatrixComponentException;
/**
@@ -278,14 +276,16 @@
* @throws TupleSourceNotFoundException
* @throws MetaMatrixComponentException
*/
- public Streamable<?> getStreamable(TupleSourceID id, String referenceId)
+ Streamable<?> getStreamable(TupleSourceID id, String referenceId)
throws TupleSourceNotFoundException, MetaMatrixComponentException;
/**
- * Assign the tuplesource as the persistent stream for the streamable
+ * Assign the {@link TupleSource} as the persistent stream for the {@link Streamable}
* @param id
* @param s
* @throws TupleSourceNotFoundException
*/
- public void setPersistentTupleSource(TupleSourceID id, Streamable<? extends Object> s) throws TupleSourceNotFoundException;
+ void setPersistentTupleSource(TupleSourceID id, Streamable<? extends Object> s)
+ throws TupleSourceNotFoundException;
+
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -460,8 +460,6 @@
// Look up info
TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
- // if there are lobs in source then we need to keep manage then
- // in a separate tuple sources.
if (info.lobsInSource()) {
correctLobReferences(info, tupleBatch);
}
@@ -477,7 +475,9 @@
synchronized(info) {
if(info.isRemoved()) {
- releaseMemory(bytes, info.getGroupInfo());
+ if(location != ManagedBatch.PERSISTENT) {
+ releaseMemory(bytes, info.getGroupInfo());
+ }
throw new TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found", tupleSourceID)); //$NON-NLS-1$
}
@@ -510,22 +510,22 @@
info.setRowCount(tupleBatch.getEndRow());
}
}
-
+
/**
- * Pin a tuple source in memory and return it. This batch must be unpinned by
+ * Pin a tuple batch in memory and return it. This batch must be unpinned by
* passed the identical tuple source ID and beginning row.
+ * NOTE: the returned {@link TupleBatch} will have the exact same bounds as the {@link ManagedBatch}
* @param tupleSourceID Tuple source identifier
* @param beginRow Beginning row
- * @param maxEndRow Max end row to return
* @throws TupleSourceNotFoundException If tuple source not found
* @throws MetaMatrixComponentException If an internal server error occurred
* @throws MemoryNotAvailableException If memory was not available for the pin
*/
- public TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow, int maxEndRow)
+ public TupleBatch pinTupleBatch(TupleSourceID tupleSourceID, int beginRow)
throws TupleSourceNotFoundException, MemoryNotAvailableException, MetaMatrixComponentException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, new Object[]{"Pinning tupleBatch for", tupleSourceID, "beginRow:", beginRow, "maxEndRow:", maxEndRow}); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, new Object[]{"Pinning tupleBatch for", tupleSourceID, "beginRow:", beginRow}); //$NON-NLS-1$ //$NON-NLS-2$
}
this.pinRequests.incrementAndGet();
@@ -537,98 +537,63 @@
synchronized (info) {
mbatch = info.getBatch(beginRow);
}
- if(mbatch == null) {
- return new TupleBatch(beginRow, Collections.EMPTY_LIST);
- }
-
- int endRow = 0;
- int pass = 0;
- //if the client request previous batch, the end row
- //is smaller than the begin row
- if(beginRow > maxEndRow) {
- endRow = Math.min(beginRow, mbatch.getEndRow());
- beginRow = Math.max(maxEndRow, mbatch.getBeginRow());
- }else {
- endRow = Math.min(maxEndRow, mbatch.getEndRow());
- }
- int count = endRow - beginRow + 1;
- if (count == 0) {
+ if (mbatch == null || beginRow == mbatch.getEndRow() + 1) {
return new TupleBatch(beginRow, Collections.EMPTY_LIST);
}
long memoryRequiredByBatch = mbatch.getSize();
TupleBatch memoryBatch = null;
- while(pass < 2) {
- if(pass == 1) {
- if(memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_MAX) {
- clean(memoryRequiredByBatch, null);
- }else {
- //exceed session limit
- clean(memoryRequiredByBatch, info.getGroupInfo());
- }
- }
+ for (int pass = 0; pass < 2; pass++) {
+ if (memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_MAX) {
+ clean(memoryRequiredByBatch, null);
+ } else if (memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_SESSION_MAX) {
+ clean(memoryRequiredByBatch, info.getGroupInfo());
+ }
synchronized(info) {
if(mbatch.getLocation() == ManagedBatch.PINNED) {
// Load batch from memory - already pinned
memoryBatch = mbatch.getBatch();
-
- } else if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
+ break;
+ }
+ if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
// Already in memory - just move from unpinned to pinned
this.unpinned.remove(mbatch);
pin(mbatch);
// Load batch from memory
memoryBatch = mbatch.getBatch();
-
- } else {
- memoryRequiredByBatch = mbatch.getSize();
-
- // Try to reserve some memory
- if((memoryAvailability = reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != BufferManagerImpl.MEMORY_AVAILABLE) {
- if(pass == 0) {
- // Break and try to clean - it is important to break out of the synchronized block
- // here so that the clean does not cause a deadlock on this TupleSourceInfo
- pass++;
- continue;
- }
+ break;
+ }
+ memoryRequiredByBatch = mbatch.getSize();
+
+ // Try to reserve some memory
+ if((memoryAvailability = reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != BufferManagerImpl.MEMORY_AVAILABLE) {
+ continue;
+ }
- // Failed to reserve the memory even after a clean, so record the failure and fail the pin
- this.pinFailures.incrementAndGet();
+ this.pinnedFromDisk.incrementAndGet();
- // Couldn't reserve memory, so throw exception
- throw new MemoryNotAvailableException(QueryExecPlugin.Util.getString("BufferManagerImpl.no_memory_available")); //$NON-NLS-1$
- }
+ // Memory was reserved, so move from persistent to memory and pin
+ int internalBeginRow = mbatch.getBeginRow();
+ memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow, info.getTypes());
- this.pinnedFromDisk.incrementAndGet();
-
- // Memory was reserved, so move from persistent to memory and pin
- int internalBeginRow = mbatch.getBeginRow();
- memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow, info.getTypes());
-
- mbatch.setBatch(memoryBatch);
- if (info.lobsInSource()) {
- correctLobReferences(info, memoryBatch);
- }
- pin(mbatch);
+ mbatch.setBatch(memoryBatch);
+ if (info.lobsInSource()) {
+ correctLobReferences(info, memoryBatch);
}
+ pin(mbatch);
}
-
- break;
}
-
- // Batch should now be pinned in memory, so grab it and build a correctly
- // sized batch to return
- if(beginRow == memoryBatch.getBeginRow() && count == memoryBatch.getRowCount()) {
- return memoryBatch;
+
+ if (memoryBatch == null) {
+ // Failed to reserve the memory even after a clean, so record the failure and fail the pin
+ this.pinFailures.incrementAndGet();
+ throw new MemoryNotAvailableException(QueryExecPlugin.Util.getString("BufferManagerImpl.no_memory_available")); //$NON-NLS-1$
}
- int firstOffset = beginRow - memoryBatch.getBeginRow();
- List[] memoryRows = memoryBatch.getAllTuples();
- List[] rows = new List[count];
- System.arraycopy(memoryRows, firstOffset, rows, 0, count);
- return new TupleBatch(beginRow, rows);
+ return memoryBatch;
}
private void pin(ManagedBatch mbatch) {
@@ -647,7 +612,7 @@
* @throws TupleSourceNotFoundException If tuple source not found
* @throws MetaMatrixComponentException If an internal server error occurred
*/
- public void unpinTupleBatch(TupleSourceID tupleSourceID, int beginRow, int endRow)
+ public void unpinTupleBatch(TupleSourceID tupleSourceID, int beginRow)
throws TupleSourceNotFoundException, MetaMatrixComponentException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
@@ -891,10 +856,8 @@
// Look up info
TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
- short location = ManagedBatch.PERSISTENT;
synchronized(info) {
-
List<LobChunk> data = new ArrayList<LobChunk>();
data.add(streamChunk);
TupleBatch batch = new TupleBatch(beginRow, new List[] {data});
@@ -902,7 +865,7 @@
// Update tuple source state (we could calculate the size of stream if need to)
ManagedBatch managedBatch = new ManagedBatch(tupleSourceID, beginRow, batch.getEndRow(), 0);
- managedBatch.setLocation(location);
+ managedBatch.setLocation(ManagedBatch.PERSISTENT);
// Update info with new rows
info.addBatch(managedBatch);
@@ -932,7 +895,7 @@
for (ManagedBatch managedBatch : pinnedByThread) {
try {
//TODO: add trace logging about the batch that is being unpinned
- unpinTupleBatch(managedBatch.getTupleSourceID(), managedBatch.getBeginRow(), managedBatch.getEndRow());
+ unpinTupleBatch(managedBatch.getTupleSourceID(), managedBatch.getBeginRow());
} catch (TupleSourceNotFoundException err) {
} catch (MetaMatrixComponentException err) {
e = err;
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -192,7 +192,7 @@
return true;
}
- if(obj == null || ! (obj instanceof ManagedBatch)) {
+ if(! (obj instanceof ManagedBatch)) {
return false;
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -40,7 +40,6 @@
private final BufferManagerImpl bufferManagerImpl;
private TupleSourceID tupleSourceID;
private List<?> schema;
- private int batchSize;
private WeakReference<TupleBatch> currentBatch;
private int currentRow = 1;
private int mark = 1;
@@ -50,7 +49,6 @@
this.bufferManagerImpl = bufferManagerImpl;
this.tupleSourceID = tupleSourceID;
this.schema = schema;
- this.batchSize = batchSize;
}
@Override
@@ -122,7 +120,7 @@
TupleBatch batch = getCurrentBatch();
if(batch != null ) {
try {
- this.bufferManagerImpl.unpinTupleBatch(this.tupleSourceID, batch.getBeginRow(), batch.getEndRow());
+ this.bufferManagerImpl.unpinTupleBatch(this.tupleSourceID, batch.getBeginRow());
} catch (TupleSourceNotFoundException e) {
throw new MetaMatrixComponentException(e);
} finally {
@@ -143,7 +141,7 @@
}
try{
- batch = this.bufferManagerImpl.pinTupleBatch(this.tupleSourceID, currentRow, (currentRow + batchSize -1));
+ batch = this.bufferManagerImpl.pinTupleBatch(this.tupleSourceID, currentRow);
currentBatch = new WeakReference<TupleBatch>(batch);
} catch (MemoryNotAvailableException e) {
/* Defect 18499 - ProcessWorker doesn't know how to handle MemoryNotAvailableException properly,
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -106,7 +106,7 @@
this.matchEnd = -1;
}
- //TODO: save partial work
+ //TODO: save partial work or combine with the sort operation
public void computeBatchBounds(SourceState state) throws TupleSourceNotFoundException, MetaMatrixComponentException {
if (endTuples != null) {
return;
@@ -117,12 +117,12 @@
while (beginRow <= state.getRowCount()) {
TupleBatch batch = null;
try {
- batch = this.joinNode.getBufferManager().pinTupleBatch(state.getTupleSourceID(), beginRow, beginRow + this.joinNode.getBatchSize() - 1);
+ batch = this.joinNode.getBufferManager().pinTupleBatch(state.getTupleSourceID(), beginRow);
if (batch.getRowCount() == 0) {
break;
}
beginRow = batch.getEndRow() + 1;
- this.joinNode.getBufferManager().unpinTupleBatch(state.getTupleSourceID(), batch.getBeginRow(), batch.getEndRow());
+ this.joinNode.getBufferManager().unpinTupleBatch(state.getTupleSourceID(), batch.getBeginRow());
if (!bounds.isEmpty()) {
overlap.add(comp.compare(bounds.get(bounds.size() - 1), batch.getTuple(batch.getBeginRow())) == 0);
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortNode.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -148,9 +148,8 @@
return terminationBatch;
}
int beginPinned = this.outputBeginRow;
- int endPinned = this.outputBeginRow+getBatchSize()-1;
try {
- TupleBatch outputBatch = getBufferManager().pinTupleBatch(outputID, beginPinned, endPinned);
+ TupleBatch outputBatch = getBufferManager().pinTupleBatch(outputID, beginPinned);
this.outputBeginRow += outputBatch.getRowCount();
@@ -164,7 +163,7 @@
} catch(MemoryNotAvailableException e) {
throw BlockedOnMemoryException.INSTANCE;
} finally {
- getBufferManager().unpinTupleBatch(outputID, beginPinned, endPinned);
+ getBufferManager().unpinTupleBatch(outputID, beginPinned);
}
}
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -164,7 +164,7 @@
while(!doneReading) {
try {
// Load and pin batch
- TupleBatch batch = bufferManager.pinTupleBatch(sourceID, sortPhaseRow, sortPhaseRow + batchSize - 1);
+ TupleBatch batch = bufferManager.pinTupleBatch(sourceID, sortPhaseRow);
if (batch.getRowCount() == 0) {
if (bufferManager.getStatus(sourceID) == TupleSourceStatus.FULL) {
@@ -205,7 +205,7 @@
// Clean up - unpin rows
for (int[] bounds : pinned) {
- bufferManager.unpinTupleBatch(sourceID, bounds[0], bounds[1]);
+ bufferManager.unpinTupleBatch(sourceID, bounds[0]);
}
}
@@ -245,7 +245,7 @@
for(; sortedIndex<activeTupleIDs.size(); sortedIndex++) {
TupleSourceID activeID = activeTupleIDs.get(sortedIndex);
try {
- TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1, this.batchSize);
+ TupleBatch sortedBatch = bufferManager.pinTupleBatch(activeID, 1);
workingBatches.add(sortedBatch);
} catch(MemoryNotAvailableException e) {
break;
@@ -379,10 +379,9 @@
TupleSourceID tsID = unpinWorkingBatch(batchIndex, currentBatch);
int beginRow = workingPointers[batchIndex];
- int endRow = beginRow + this.batchSize - 1;
try {
- TupleBatch newBatch = bufferManager.pinTupleBatch(tsID, beginRow, endRow);
+ TupleBatch newBatch = bufferManager.pinTupleBatch(tsID, beginRow);
if(newBatch.getRowCount() == 0) {
// Done with this working batch
workingBatches.set(batchIndex, null);
@@ -400,8 +399,7 @@
MetaMatrixComponentException {
TupleSourceID tsID = activeTupleIDs.get(batchIndex);
int lastBeginRow = currentBatch.getBeginRow();
- int lastEndRow = currentBatch.getEndRow();
- bufferManager.unpinTupleBatch(tsID, lastBeginRow, lastEndRow);
+ bufferManager.unpinTupleBatch(tsID, lastBeginRow);
return tsID;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DQPCore.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -269,14 +269,14 @@
}
public ResultsFuture<ResultsMessage> processCursorRequest(long reqID,
- int batchFirst, int batchLast) throws MetaMatrixProcessingException {
+ int batchFirst, int fetchSize) throws MetaMatrixProcessingException {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_DQP, "DQP process cursor request from " + batchFirst + " to " + batchLast); //$NON-NLS-1$//$NON-NLS-2$
+ LogManager.logDetail(LogConstants.CTX_DQP, "DQP process cursor request from " + batchFirst); //$NON-NLS-1$
}
DQPWorkContext workContext = DQPWorkContext.getWorkContext();
ResultsFuture<ResultsMessage> resultsFuture = new ResultsFuture<ResultsMessage>();
RequestWorkItem workItem = getRequestWorkItem(workContext.getRequestID(reqID));
- workItem.requestMore(batchFirst, batchLast, resultsFuture.getResultsReceiver());
+ workItem.requestMore(batchFirst, batchFirst + Math.min(fetchSize, maxFetchSize) - 1, resultsFuture.getResultsReceiver());
return resultsFuture;
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -26,7 +26,6 @@
import java.nio.charset.Charset;
import java.sql.SQLException;
-import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.lob.ByteLobChunkStream;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.lob.LobChunkProducer;
@@ -45,7 +44,7 @@
LobChunkProducer internalStream = null;
- public LobChunkStream(Streamable<?> streamable, int chunkSize, BufferManager bufferMgr)
+ public LobChunkStream(Streamable<?> streamable, int chunkSize)
throws IOException {
try {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -120,7 +120,7 @@
// get the reference object in the buffer manager, and try to stream off
// the original sources.
Streamable<?> streamable = dqpCore.getBufferManager().getStreamable(parent.resultsID, referenceStreamId);
- return new LobChunkStream(streamable, chunkSize, dqpCore.getBufferManager());
+ return new LobChunkStream(streamable, chunkSize);
}
synchronized void setResultsReceiver(ResultsReceiver<LobChunk> resultsReceiver) {
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -59,7 +59,6 @@
import com.metamatrix.common.xa.XATransactionException;
import com.metamatrix.core.MetaMatrixCoreException;
import com.metamatrix.core.log.MessageLevel;
-import com.metamatrix.core.util.Assertion;
import com.metamatrix.dqp.DQPPlugin;
import com.metamatrix.dqp.exception.SourceWarning;
import com.metamatrix.dqp.message.AtomicRequestID;
@@ -104,7 +103,6 @@
}
this.begin = beginRow;
this.end = endRow;
- Assertion.assertTrue(end - begin >= 0);
this.resultsRequested = true;
}
@@ -427,8 +425,19 @@
try {
if (batch == null || batch.getBeginRow() > this.resultsCursor.begin) {
- batch = this.bufferMgr.pinTupleBatch(resultsID, resultsCursor.begin, resultsCursor.end);
+ batch = this.bufferMgr.pinTupleBatch(resultsID, resultsCursor.begin);
pinned = true;
+ //TODO: support fetching more than 1 batch
+ int count = this.resultsCursor.end - this.resultsCursor.begin + 1;
+ if (batch.getRowCount() > count) {
+ int beginRow = Math.min(this.resultsCursor.begin, batch.getEndRow() - count + 1);
+ int endRow = Math.min(beginRow + count - 1, batch.getEndRow());
+ int firstOffset = beginRow - batch.getBeginRow();
+ List[] memoryRows = batch.getAllTuples();
+ List[] rows = new List[count];
+ System.arraycopy(memoryRows, firstOffset, rows, 0, endRow - beginRow + 1);
+ batch = new TupleBatch(beginRow, rows);
+ }
}
int finalRowCount = doneProducingBatches?this.processor.getHighestRow():-1;
@@ -480,7 +489,7 @@
} finally {
try {
if (pinned) {
- this.bufferMgr.unpinTupleBatch(this.resultsID, batch.getBeginRow(), batch.getEndRow());
+ this.bufferMgr.unpinTupleBatch(this.resultsID, batch.getBeginRow());
}
} catch (Exception e) {
// ignore - nothing more we can do
Modified: trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -22,6 +22,8 @@
package com.metamatrix.common.buffer.impl;
+import static org.junit.Assert.*;
+
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
@@ -32,7 +34,7 @@
import java.util.Properties;
import java.util.Random;
-import junit.framework.TestCase;
+import org.junit.Test;
import com.metamatrix.api.exception.MetaMatrixComponentException;
import com.metamatrix.api.exception.MetaMatrixProcessingException;
@@ -59,20 +61,12 @@
/**
*/
-public class TestBufferManagerImpl extends TestCase {
+public class TestBufferManagerImpl {
- /**
- * Constructor for TestBufferManagerImpl.
- * @param arg0
- */
- public TestBufferManagerImpl(String arg0) {
- super(arg0);
- }
-
public static BufferManager getTestBufferManager(long bytesAvailable, StorageManager sm2) throws MetaMatrixComponentException {
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, "" + bytesAvailable); //$NON-NLS-1$
+ bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, String.valueOf(bytesAvailable));
bmProps.setProperty(BufferManagerPropertyNames.MANAGEMENT_INTERVAL, "0"); //$NON-NLS-1$
BufferManager bufferManager = new BufferManagerImpl();
bufferManager.initialize("local", bmProps); //$NON-NLS-1$
@@ -164,14 +158,14 @@
ts.closeSource();
}
- public void testSpanStorage() throws Exception {
+ @Test public void testSpanStorage() throws Exception {
helpTestAddBatches(createFakeDatabaseStorageManager(),
1,
50,
100);
}
- public void testStandalone() throws Exception {
+ @Test public void testStandalone() throws Exception {
helpTestAddBatches(null,
10,
5,
@@ -212,7 +206,7 @@
return subRows;
}
- public void testCreateLobReference() throws Exception {
+ @Test public void testCreateLobReference() throws Exception {
final BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1, createFakeDatabaseStorageManager());
XMLType xml1 = new XMLType(new SQLXMLImpl("<foo/>")); //$NON-NLS-1$
@@ -231,15 +225,24 @@
// when adding to the tuple source the reference id is assigned
// but not the persistence stream id.
- assertTrue(xml1.getReferenceStreamId() != null);
- assertTrue(xml2.getReferenceStreamId() != null);
- assertTrue(xml1.getPersistenceStreamId() == null);
- assertTrue(xml2.getPersistenceStreamId() == null);
+ assertNotNull(xml1.getReferenceStreamId());
+ assertNotNull(xml2.getReferenceStreamId());
+ assertNull(xml1.getPersistenceStreamId());
+ assertNull(xml2.getPersistenceStreamId());
assertNotNull(mgr.getStreamable(id, xml1.getReferenceStreamId()));
+
+ final TupleSourceID id1 = mgr.createTupleSource(schema, new String[] {DataTypeManager.DefaultDataTypes.XML}, "GROUP1", TupleSourceType.PROCESSOR); //$NON-NLS-1$
+
+ TupleBatch batch1 = new TupleBatch(1, new List[] {xmlList1, xmlList2});
+ mgr.addTupleBatch(id1, batch1);
+
+ //assure that even though we've removed a later tuple source, the reference still exists
+ mgr.removeTupleSource(id1);
+ assertNotNull(mgr.getStreamable(id, xml1.getReferenceStreamId()));
}
- public void testAddStreamablePart() throws Exception {
+ @Test public void testAddStreamablePart() throws Exception {
final BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
// save the lob
@@ -300,7 +303,7 @@
}
- public void testPinning1() throws Exception {
+ @Test public void testPinning1() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -321,9 +324,9 @@
int readPerBatch = 1000;
for(int i=1; i<maxRows; i=i+readPerBatch) {
int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
+ TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i);
helpCompareBatches(exampleBatch(i, end), checkBatch);
- mgr.unpinTupleBatch(tsID, i, end);
+ mgr.unpinTupleBatch(tsID, i);
}
// Remove
@@ -331,7 +334,7 @@
}
- public void testUnpinOfUnpinnedBatch() throws Exception {
+ @Test public void testUnpinOfUnpinnedBatch() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -352,20 +355,28 @@
int readPerBatch = 100;
for(int i=1; i<maxRows; i=i+readPerBatch) {
int end = i+readPerBatch-1;
- mgr.unpinTupleBatch(tsID, i, end);
+ mgr.unpinTupleBatch(tsID, i);
}
// Walk through by 100's pinning and unpinning
- for(int i=1; i<maxRows; i=i+readPerBatch) {
- int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(i, end), checkBatch);
- mgr.unpinTupleBatch(tsID, i, end);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
// Remove
mgr.removeTupleSource(tsID);
}
+
+ private void helpTestPinAndUnpin(BufferManager mgr, TupleSourceID tsID,
+ int maxRows, int writePerBatch, int readPerBatch)
+ throws TupleSourceNotFoundException, MemoryNotAvailableException,
+ MetaMatrixComponentException {
+ for(int i=1; i<maxRows; i=i+readPerBatch) {
+ int end = (1+i/writePerBatch)*writePerBatch;
+ int begin = end - writePerBatch + 1;
+ TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i);
+ helpCompareBatches(exampleBatch(begin, end), checkBatch);
+ mgr.unpinTupleBatch(tsID, i);
+ }
+ }
private TupleBatch exampleBigBatch(int begin, int end, int charsPerRow) {
@@ -385,7 +396,7 @@
return new TupleBatch(begin, rows);
}
- public void testDeadlockOnMultiThreadClean() throws Exception {
+ @Test public void testDeadlockOnMultiThreadClean() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -422,7 +433,7 @@
}
}
- public void testSessionMax_Fail() throws Exception {
+ @Test public void testSessionMax_Fail() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
@@ -443,8 +454,8 @@
// Walk through by 1000's pinning and unpinning
int readPerBatch = 1000;
for(int i=1; i<maxRows; i=i+readPerBatch) {
- int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
+ int end = (1+i/writePerBatch)*writePerBatch;
+ TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i);
helpCompareBatches(exampleBatch(i, end), checkBatch);
}
fail("Should have failed"); //$NON-NLS-1$
@@ -463,7 +474,7 @@
* @throws Exception
* @since 4.3
*/
- public void testDefect_18499() throws Exception {
+ @Test public void testDefect_18499() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
schema.add("col"); //$NON-NLS-1$
@@ -484,7 +495,7 @@
}
}
- public void testDefect18497() throws Exception {
+ @Test public void testDefect18497() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
@@ -504,12 +515,7 @@
// Walk through by 1000's pinning and unpinning
int readPerBatch = 1000;
- for(int i=1; i<maxRows; i=i+readPerBatch) {
- int end = i+readPerBatch-1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(i, end), checkBatch);
- mgr.unpinTupleBatch(tsID, i, end);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
} finally {
// Remove
mgr.removeTupleSource(tsID);
@@ -518,7 +524,7 @@
}
//two threads do the cleaning at the same time
- public void testDefect19325() throws Exception{
+ @Test public void testDefect19325() throws Exception{
BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
List schema = new ArrayList();
@@ -554,7 +560,7 @@
}
//test many small batches
- public void testSmallBatches() throws Exception{
+ @Test public void testSmallBatches() throws Exception{
BufferManager mgr = getTestBufferManager(50, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
@@ -588,7 +594,7 @@
}
//going backward
- public void testPinning2() throws Exception {
+ @Test public void testPinning2() throws Exception {
BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
@@ -607,21 +613,11 @@
// Walk through by 100's pinning and unpinning
int readPerBatch = 100;
- for(int i=maxRows; i>=1; i=i-readPerBatch) {
- int end = i-readPerBatch+1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(end, i), checkBatch);
- mgr.unpinTupleBatch(tsID, end, i);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
// Walk through by 2000's pinning and unpinning
readPerBatch = 2000;
- for(int i=maxRows; i>=1; i=i-readPerBatch) {
- int end = i-readPerBatch+1;
- TupleBatch checkBatch = mgr.pinTupleBatch(tsID, i, end);
- helpCompareBatches(exampleBatch(end + (readPerBatch - writePerBatch), i), checkBatch);
- mgr.unpinTupleBatch(tsID, end + (readPerBatch - writePerBatch), i);
- }
+ helpTestPinAndUnpin(mgr, tsID, maxRows, writePerBatch, readPerBatch);
// Remove
mgr.removeTupleSource(tsID);
@@ -679,8 +675,8 @@
int batch = random.nextInt(batches);
begin = 1 + (batch * rowsPerBatch);
end = begin + rowsPerBatch - 1;
- bufferMgr.pinTupleBatch(tsID, begin, end);
- bufferMgr.unpinTupleBatch(tsID, begin, end);
+ bufferMgr.pinTupleBatch(tsID, begin);
+ bufferMgr.unpinTupleBatch(tsID, begin);
} catch(MemoryNotAvailableException e) {
}
@@ -690,7 +686,7 @@
e.printStackTrace();
} finally {
try {
- bufferMgr.unpinTupleBatch(tsID, begin, end);
+ bufferMgr.unpinTupleBatch(tsID, begin);
} catch(Exception e) {
// ignore
}
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -95,11 +95,10 @@
private boolean wasBlocked;
/**
- * @see com.metamatrix.common.buffer.impl.BufferManagerImpl#pinTupleBatch(com.metamatrix.common.buffer.TupleSourceID, int, int)
+ * @see com.metamatrix.common.buffer.impl.BufferManagerImpl#pinTupleBatch(com.metamatrix.common.buffer.TupleSourceID, int)
*/
public TupleBatch pinTupleBatch(TupleSourceID tupleSourceID,
- int beginRow,
- int maxEndRow) throws TupleSourceNotFoundException,
+ int beginRow) throws TupleSourceNotFoundException,
MemoryNotAvailableException,
MetaMatrixComponentException {
if (blockOn != null && blockOn.contains(new Integer(++pinCount))) {
@@ -109,7 +108,7 @@
wasBlocked = false;
- return super.pinTupleBatch(tupleSourceID, beginRow, maxEndRow);
+ return super.pinTupleBatch(tupleSourceID, beginRow);
}
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2009-08-12 23:58:48 UTC (rev 1238)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2009-08-13 03:32:14 UTC (rev 1239)
@@ -29,12 +29,15 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import org.teiid.dqp.internal.datamgr.impl.FakeTransactionService;
import org.teiid.dqp.internal.process.DQPCore;
import org.teiid.dqp.internal.process.DQPWorkContext;
import org.teiid.dqp.internal.process.DQPCore.ConnectorCapabilitiesCache;
-import junit.framework.TestCase;
+import static org.junit.Assert.*;
import com.metamatrix.api.exception.query.QueryResolverException;
import com.metamatrix.common.application.ApplicationEnvironment;
@@ -55,16 +58,11 @@
import com.metamatrix.query.unittest.FakeMetadataFactory;
-public class TestDQPCore extends TestCase {
+public class TestDQPCore {
- public TestDQPCore(String name) {
- super(name);
- }
-
private DQPCore core;
- @Override
- protected void setUp() throws Exception {
+ @Before public void setUp() throws Exception {
DQPWorkContext workContext = new DQPWorkContext();
workContext.setVdbName("bqt"); //$NON-NLS-1$
workContext.setVdbVersion("1"); //$NON-NLS-1$
@@ -96,8 +94,7 @@
core.start(new Properties());
}
- @Override
- protected void tearDown() throws Exception {
+ @After public void tearDown() throws Exception {
DQPWorkContext.setWorkContext(new DQPWorkContext());
core.stop();
}
@@ -112,84 +109,84 @@
return msg;
}
- public void testRequest1() throws Exception {
+ @Test public void testRequest1() throws Exception {
helpExecute("SELECT IntKey FROM BQT1.SmallA", "a"); //$NON-NLS-1$ //$NON-NLS-2$
}
- public void testUser1() throws Exception {
+ @Test public void testUser1() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() = 'logon'"; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser2() throws Exception {
+ @Test public void testUser2() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() LIKE 'logon'"; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser3() throws Exception {
+ @Test public void testUser3() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() IN ('logon3') AND StringKey LIKE '1'"; //$NON-NLS-1$
String userName = "logon3"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser4() throws Exception {
+ @Test public void testUser4() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE 'logon4' = user() AND StringKey = '1'"; //$NON-NLS-1$
String userName = "logon4"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser5() throws Exception {
+ @Test public void testUser5() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() IS NULL "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser6() throws Exception {
+ @Test public void testUser6() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() = 'logon33' "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser7() throws Exception {
+ @Test public void testUser7() throws Exception {
String sql = "UPDATE BQT1.SmallA SET IntKey = 2 WHERE user() = 'logon' AND StringKey = '1' "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser8() throws Exception {
+ @Test public void testUser8() throws Exception {
String sql = "SELECT user(), StringKey FROM BQT1.SmallA WHERE IntKey = 1 "; //$NON-NLS-1$
String userName = "logon"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testUser9() throws Exception {
+ @Test public void testUser9() throws Exception {
String sql = "SELECT IntKey FROM BQT1.SmallA WHERE user() = StringKey AND StringKey = '1' "; //$NON-NLS-1$
String userName = "1"; //$NON-NLS-1$
helpExecute(sql, userName);
}
- public void testEnvSessionId() throws Exception {
+ @Test public void testEnvSessionId() throws Exception {
String sql = "SELECT env('sessionid') as SessionID"; //$NON-NLS-1$
String userName = "1"; //$NON-NLS-1$
ResultsMessage rm = helpExecute(sql, userName);
assertEquals("1", rm.getResults()[0].get(0)); //$NON-NLS-1$
}
- public void testEnvSessionIdMixedCase() throws Exception {
+ @Test public void testEnvSessionIdMixedCase() throws Exception {
String sql = "SELECT env('sEsSIonId') as SessionID"; //$NON-NLS-1$
String userName = "1"; //$NON-NLS-1$
ResultsMessage rm = helpExecute(sql, userName);
assertEquals("1", rm.getResults()[0].get(0)); //$NON-NLS-1$
}
- public void testTxnAutoWrap() throws Exception {
+ @Test public void testTxnAutoWrap() throws Exception {
String sql = "SELECT * FROM BQT1.SmallA"; //$NON-NLS-1$
helpExecute(sql, "a", 1, true); //$NON-NLS-1$
}
- public void testPlanOnly() throws Exception {
+ @Test public void testPlanOnly() throws Exception {
String sql = "SELECT * FROM BQT1.SmallA option planonly"; //$NON-NLS-1$
helpExecute(sql,"a"); //$NON-NLS-1$
}
@@ -198,7 +195,7 @@
* Tests whether an exception result is sent when an exception occurs
* @since 4.3
*/
- public void testPlanningException() throws Exception {
+ @Test public void testPlanningException() throws Exception {
String sql = "SELECT IntKey FROM BQT1.BadIdea "; //$NON-NLS-1$
RequestMessage reqMsg = exampleRequestMessage(sql);
@@ -211,7 +208,7 @@
}
}
- public void testCapabilitesCache() {
+ @Test public void testCapabilitesCache() {
ConnectorCapabilitiesCache cache = new ConnectorCapabilitiesCache();
DQPWorkContext workContext = new DQPWorkContext();
workContext.setVdbName("foo"); //$NON-NLS-1$
@@ -226,11 +223,11 @@
assertNull(vdbCapabilites.get("model1")); //$NON-NLS-1$
}
- public void testLookupVisibility() throws Exception {
+ @Test public void testLookupVisibility() throws Exception {
helpTestVisibilityFails("select lookup('bqt3.smalla', 'intkey', 'stringkey', '?')"); //$NON-NLS-1$
}
- public void testCancel() throws Exception {
+ @Test public void testCancel() throws Exception {
assertFalse(this.core.cancelRequest(new RequestID(1)));
}
@@ -242,7 +239,7 @@
assertEquals("[QueryValidatorException]Group does not exist: BQT3.SmallA", results.getException().toString()); //$NON-NLS-1$
}
- public void testXQueryVisibility() throws Exception {
+ @Test public void testXQueryVisibility() throws Exception {
String xquery = "<Items>\r\n" + //$NON-NLS-1$
"{\r\n" + //$NON-NLS-1$
"for $x in doc(\"select * from bqt3.smalla\")//Item\r\n" + //$NON-NLS-1$
@@ -268,7 +265,9 @@
Future<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
ResultsMessage results = message.get(50000, TimeUnit.MILLISECONDS);
- assertNull(results.getException());
+ if (results.getException() != null) {
+ throw results.getException();
+ }
return results;
}
}
More information about the teiid-commits
mailing list