[teiid-commits] teiid SVN: r1763 - in trunk/engine/src: test/java/com/metamatrix/query/processor/relational and 1 other directory.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Wed Jan 20 22:59:25 EST 2010
Author: shawkins
Date: 2010-01-20 22:59:24 -0500 (Wed, 20 Jan 2010)
New Revision: 1763
Modified:
trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
Log:
TEIID-925 further refining the dup remove strategy for performance.
Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-21 03:49:48 UTC (rev 1762)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java 2010-01-21 03:59:24 UTC (rev 1763)
@@ -95,7 +95,7 @@
private List<TupleBuffer> activeTupleBuffers = new ArrayList<TupleBuffer>();
private int masterSortIndex;
- private int dupRemoveSublists = 1; //used to control the number of sublists needed for dup remove
+ private int collected;
// Phase constants for readability
private static final int INITIAL_SORT = 1;
@@ -158,7 +158,8 @@
}
private TupleBuffer createTupleBuffer() throws MetaMatrixComponentException {
- return bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
+ TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
+ return tb;
}
/**
@@ -173,48 +174,63 @@
workingTuples = new TreeSet<List<?>>(comparator);
}
}
-
- int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
- while(!doneReading && workingTuples.size() < maxRows) {
- try {
- List<?> tuple = sourceID.nextTuple();
-
- if (tuple == null) {
- doneReading = true;
- break;
- }
-
- workingTuples.add(tuple);
- } catch(BlockedException e) {
- if ((workingTuples.size() < maxRows/2 && mode != Mode.DUP_REMOVE)
- || (workingTuples.size() < (dupRemoveSublists/4)*bufferManager.getProcessorBatchSize() && activeTupleBuffers.size() < dupRemoveSublists)) {
- throw e; //block if no work can be performed
- }
- break;
- }
- }
-
- if(workingTuples.isEmpty()) {
- break;
- }
+ int totalReservedBuffers = 0;
+ try {
+ int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
+ while(!doneReading) {
+ if (workingTuples.size() == maxRows) {
+ int reserved = bufferManager.reserveBuffers(1, false);
+ if (reserved == 0) {
+ break;
+ }
+ totalReservedBuffers += 1;
+ maxRows += bufferManager.getProcessorBatchSize();
+ }
+ try {
+ List<?> tuple = sourceID.nextTuple();
+
+ if (tuple == null) {
+ doneReading = true;
+ break;
+ }
+ if (workingTuples.add(tuple)) {
+ this.collected++;
+ }
+ } catch(BlockedException e) {
+ if (mode != Mode.DUP_REMOVE
+ || (this.output != null && collected < this.output.getRowCount() * 2)
+ || (this.output == null && this.workingTuples.isEmpty() && this.activeTupleBuffers.isEmpty())) {
+ throw e; //block if no work can be performed
+ }
+ break;
+ }
+ }
- TupleBuffer sublist = createTupleBuffer();
- activeTupleBuffers.add(sublist);
- if (this.mode == Mode.SORT) {
- //perform a stable sort
- Collections.sort((List<List<?>>)workingTuples, comparator);
- }
- for (List<?> list : workingTuples) {
- sublist.addTuple(list);
- }
- workingTuples = null;
- sublist.saveBatch();
+ if(workingTuples.isEmpty()) {
+ break;
+ }
+
+ TupleBuffer sublist = createTupleBuffer();
+ activeTupleBuffers.add(sublist);
+ if (this.mode == Mode.SORT) {
+ //perform a stable sort
+ Collections.sort((List<List<?>>)workingTuples, comparator);
+ }
+ for (List<?> list : workingTuples) {
+ sublist.addTuple(list);
+ }
+ workingTuples = null;
+
+ sublist.saveBatch();
+ } finally {
+ bufferManager.releaseBuffers(totalReservedBuffers);
+ }
}
if (this.activeTupleBuffers.isEmpty()) {
activeTupleBuffers.add(createTupleBuffer());
}
- this.dupRemoveSublists = Math.min(dupRemoveSublists * 2, bufferManager.getMaxProcessingBatches() * 2);
+ this.collected = 0;
this.phase = MERGE;
}
@@ -224,45 +240,54 @@
TupleBuffer merged = createTupleBuffer();
- int maxSortIndex = Math.min(this.bufferManager.getMaxProcessingBatches() * 2, activeTupleBuffers.size());
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
+ int maxSortIndex = Math.min(activeTupleBuffers.size(), this.bufferManager.getMaxProcessingBatches());
+ int reservedBuffers = 0;
+ if (activeTupleBuffers.size() > maxSortIndex) {
+ reservedBuffers = bufferManager.reserveBuffers(activeTupleBuffers.size() - maxSortIndex, true);
}
- // initialize the sublists with the min value
- for(int i = 0; i<maxSortIndex; i++) {
- TupleBuffer activeID = activeTupleBuffers.get(i);
- SortedSublist sortedSublist = new SortedSublist();
- sortedSublist.its = activeID.createIndexedTupleSource();
- sortedSublist.index = i;
- if (activeID == output) {
- sortedSublist.limit = output.getRowCount();
- }
- incrementWorkingTuple(sublists, sortedSublist);
+ maxSortIndex += reservedBuffers;
+ try {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ // initialize the sublists with the min value
+ for(int i = 0; i<maxSortIndex; i++) {
+ TupleBuffer activeID = activeTupleBuffers.get(i);
+ SortedSublist sortedSublist = new SortedSublist();
+ sortedSublist.its = activeID.createIndexedTupleSource();
+ sortedSublist.index = i;
+ if (activeID == output) {
+ sortedSublist.limit = output.getRowCount();
+ }
+ incrementWorkingTuple(sublists, sortedSublist);
+ }
+
+ // iteratively process the lowest tuple
+ while (sublists.size() > 0) {
+ SortedSublist sortedSublist = sublists.remove(sublists.size() - 1);
+ merged.addTuple(sortedSublist.tuple);
+ if (this.output != null && sortedSublist.index > masterSortIndex) {
+ this.output.addTuple(sortedSublist.tuple); //a new distinct row
+ }
+ incrementWorkingTuple(sublists, sortedSublist);
+ }
+
+ // Remove merged sublists
+ for(int i=0; i<maxSortIndex; i++) {
+ TupleBuffer id = activeTupleBuffers.remove(0);
+ if (id != this.output) {
+ id.remove();
+ }
+ }
+ merged.saveBatch();
+ this.activeTupleBuffers.add(merged);
+ masterSortIndex = masterSortIndex - maxSortIndex + 1;
+ if (masterSortIndex < 0) {
+ masterSortIndex = this.activeTupleBuffers.size() - 1;
+ }
+ } finally {
+ this.bufferManager.releaseBuffers(reservedBuffers);
}
-
- // iteratively process the lowest tuple
- while (sublists.size() > 0) {
- SortedSublist sortedSublist = sublists.remove(sublists.size() - 1);
- merged.addTuple(sortedSublist.tuple);
- if (this.output != null && sortedSublist.index > masterSortIndex) {
- this.output.addTuple(sortedSublist.tuple); //a new distinct row
- }
- incrementWorkingTuple(sublists, sortedSublist);
- }
-
- // Remove merged sublists
- for(int i=0; i<maxSortIndex; i++) {
- TupleBuffer id = activeTupleBuffers.remove(0);
- if (id != this.output) {
- id.remove();
- }
- }
- merged.saveBatch();
- this.activeTupleBuffers.add(merged);
- masterSortIndex = masterSortIndex - maxSortIndex + 1;
- if (masterSortIndex < 0) {
- masterSortIndex = this.activeTupleBuffers.size() - 1;
- }
}
// Close sorted source (all others have been removed)
Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2010-01-21 03:49:48 UTC (rev 1762)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java 2010-01-21 03:59:24 UTC (rev 1763)
@@ -295,6 +295,7 @@
}
tsid.addTuple(Arrays.asList(2));
+ tsid.addTuple(Arrays.asList(3));
su.sort();
assertEquals(Arrays.asList(2), ts.nextTuple());
}
More information about the teiid-commits
mailing list