[teiid-commits] teiid SVN: r1763 - in trunk/engine/src: test/java/com/metamatrix/query/processor/relational and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jan 20 22:59:25 EST 2010


Author: shawkins
Date: 2010-01-20 22:59:24 -0500 (Wed, 20 Jan 2010)
New Revision: 1763

Modified:
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
Log:
TEIID-925 further refining the dup remove strategy for performance.

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-21 03:49:48 UTC (rev 1762)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2010-01-21 03:59:24 UTC (rev 1763)
@@ -95,7 +95,7 @@
     private List<TupleBuffer> activeTupleBuffers = new ArrayList<TupleBuffer>();
     private int masterSortIndex;
     
-    private int dupRemoveSublists = 1;     //used to control the number of sublists needed for dup remove
+    private int collected;
 
     // Phase constants for readability
     private static final int INITIAL_SORT = 1;
@@ -158,7 +158,8 @@
     }
 
 	private TupleBuffer createTupleBuffer() throws MetaMatrixComponentException {
-		return bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
+		TupleBuffer tb = bufferManager.createTupleBuffer(this.schema, this.groupName, TupleSourceType.PROCESSOR);
+		return tb;
 	}
     
 	/**
@@ -173,48 +174,63 @@
 	            	workingTuples = new TreeSet<List<?>>(comparator);
 	            }
     		}
-            
-            int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
-	        while(!doneReading && workingTuples.size() < maxRows) {
-	            try {
-	            	List<?> tuple = sourceID.nextTuple();
-	            	
-	            	if (tuple == null) {
-	            		doneReading = true;
-	            		break;
-	            	}
-	            	
-                    workingTuples.add(tuple);
-	            } catch(BlockedException e) {
-	            	if ((workingTuples.size() < maxRows/2 && mode != Mode.DUP_REMOVE) 
-	            			|| (workingTuples.size() < (dupRemoveSublists/4)*bufferManager.getProcessorBatchSize() && activeTupleBuffers.size() < dupRemoveSublists)) {
-            			throw e; //block if no work can be performed
-	            	}
-	            	break;
-	            } 
-	        }
-	
-	        if(workingTuples.isEmpty()) {
-	        	break;
-	        }
+            int totalReservedBuffers = 0;
+            try {
+	            int maxRows = bufferManager.getMaxProcessingBatches() * bufferManager.getProcessorBatchSize();
+		        while(!doneReading) {
+		        	if (workingTuples.size() == maxRows) {
+		        		int reserved = bufferManager.reserveBuffers(1, false);
+		        		if (reserved == 0) {
+		        			break;
+		        		} 
+		        		totalReservedBuffers += 1;
+		        		maxRows += bufferManager.getProcessorBatchSize();
+		        	}
+		            try {
+		            	List<?> tuple = sourceID.nextTuple();
+		            	
+		            	if (tuple == null) {
+		            		doneReading = true;
+		            		break;
+		            	}
+	                    if (workingTuples.add(tuple)) {
+	                    	this.collected++;
+	                    }
+		            } catch(BlockedException e) {
+		            	if (mode != Mode.DUP_REMOVE  
+		            			|| (this.output != null && collected < this.output.getRowCount() * 2) 
+		            			|| (this.output == null && this.workingTuples.isEmpty() && this.activeTupleBuffers.isEmpty())) {
+	            			throw e; //block if no work can be performed
+		            	}
+		            	break;
+		            } 
+		        } 
 		
-	        TupleBuffer sublist = createTupleBuffer();
-	        activeTupleBuffers.add(sublist);
-	        if (this.mode == Mode.SORT) {
-	        	//perform a stable sort
-	    		Collections.sort((List<List<?>>)workingTuples, comparator);
-	        }
-	        for (List<?> list : workingTuples) {
-				sublist.addTuple(list);
-			}
-	        workingTuples = null;
-	        sublist.saveBatch();
+		        if(workingTuples.isEmpty()) {
+		        	break;
+		        }
+			
+		        TupleBuffer sublist = createTupleBuffer();
+		        activeTupleBuffers.add(sublist);
+		        if (this.mode == Mode.SORT) {
+		        	//perform a stable sort
+		    		Collections.sort((List<List<?>>)workingTuples, comparator);
+		        }
+		        for (List<?> list : workingTuples) {
+					sublist.addTuple(list);
+				}
+		        workingTuples = null;
+	            
+		        sublist.saveBatch();
+            } finally {
+            	bufferManager.releaseBuffers(totalReservedBuffers);
+            }
         }
     	
     	if (this.activeTupleBuffers.isEmpty()) {
             activeTupleBuffers.add(createTupleBuffer());
         }  
-    	this.dupRemoveSublists = Math.min(dupRemoveSublists * 2, bufferManager.getMaxProcessingBatches() * 2);
+    	this.collected = 0;
         this.phase = MERGE;
     }
 
@@ -224,45 +240,54 @@
             
             TupleBuffer merged = createTupleBuffer();
 
-            int maxSortIndex = Math.min(this.bufferManager.getMaxProcessingBatches() * 2, activeTupleBuffers.size());
-        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
-            	LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
+            int maxSortIndex = Math.min(activeTupleBuffers.size(), this.bufferManager.getMaxProcessingBatches());
+            int reservedBuffers = 0;
+            if (activeTupleBuffers.size() > maxSortIndex) {
+            	reservedBuffers = bufferManager.reserveBuffers(activeTupleBuffers.size() - maxSortIndex, true);
             }
-        	// initialize the sublists with the min value
-            for(int i = 0; i<maxSortIndex; i++) { 
-             	TupleBuffer activeID = activeTupleBuffers.get(i);
-            	SortedSublist sortedSublist = new SortedSublist();
-            	sortedSublist.its = activeID.createIndexedTupleSource();
-            	sortedSublist.index = i;
-            	if (activeID == output) {
-            		sortedSublist.limit = output.getRowCount();
-            	}
-            	incrementWorkingTuple(sublists, sortedSublist);
+            maxSortIndex += reservedBuffers;
+            try {
+	        	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+	            	LogManager.logTrace(LogConstants.CTX_DQP, "Merging", maxSortIndex, "sublists out of", activeTupleBuffers.size()); //$NON-NLS-1$ //$NON-NLS-2$
+	            }
+	        	// initialize the sublists with the min value
+	            for(int i = 0; i<maxSortIndex; i++) { 
+	             	TupleBuffer activeID = activeTupleBuffers.get(i);
+	             	SortedSublist sortedSublist = new SortedSublist();
+	            	sortedSublist.its = activeID.createIndexedTupleSource();
+	            	sortedSublist.index = i;
+	            	if (activeID == output) {
+	            		sortedSublist.limit = output.getRowCount();
+	            	}
+	            	incrementWorkingTuple(sublists, sortedSublist);
+	            }
+	            
+	            // iteratively process the lowest tuple
+	            while (sublists.size() > 0) {
+	            	SortedSublist sortedSublist = sublists.remove(sublists.size() - 1);
+	        		merged.addTuple(sortedSublist.tuple);
+	                if (this.output != null && sortedSublist.index > masterSortIndex) {
+	                	this.output.addTuple(sortedSublist.tuple); //a new distinct row
+	            	}
+	            	incrementWorkingTuple(sublists, sortedSublist);
+	            }                
+	
+	            // Remove merged sublists
+	            for(int i=0; i<maxSortIndex; i++) {
+	            	TupleBuffer id = activeTupleBuffers.remove(0);
+	            	if (id != this.output) {
+	            		id.remove();
+	            	}
+	            }
+	            merged.saveBatch();
+	            this.activeTupleBuffers.add(merged);           
+	            masterSortIndex = masterSortIndex - maxSortIndex + 1;
+	            if (masterSortIndex < 0) {
+	            	masterSortIndex = this.activeTupleBuffers.size() - 1;
+	            }
+            } finally {
+            	this.bufferManager.releaseBuffers(reservedBuffers);
             }
-            
-            // iteratively process the lowest tuple
-            while (sublists.size() > 0) {
-            	SortedSublist sortedSublist = sublists.remove(sublists.size() - 1);
-        		merged.addTuple(sortedSublist.tuple);
-                if (this.output != null && sortedSublist.index > masterSortIndex) {
-                	this.output.addTuple(sortedSublist.tuple); //a new distinct row
-            	}
-            	incrementWorkingTuple(sublists, sortedSublist);
-            }                
-
-            // Remove merged sublists
-            for(int i=0; i<maxSortIndex; i++) {
-            	TupleBuffer id = activeTupleBuffers.remove(0);
-            	if (id != this.output) {
-            		id.remove();
-            	}
-            }
-            merged.saveBatch();
-            this.activeTupleBuffers.add(merged);           
-            masterSortIndex = masterSortIndex - maxSortIndex + 1;
-            if (masterSortIndex < 0) {
-            	masterSortIndex = this.activeTupleBuffers.size() - 1;
-            }
         }
     	
         // Close sorted source (all others have been removed)

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2010-01-21 03:49:48 UTC (rev 1762)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2010-01-21 03:59:24 UTC (rev 1763)
@@ -295,6 +295,7 @@
     		
     	}
     	tsid.addTuple(Arrays.asList(2));
+    	tsid.addTuple(Arrays.asList(3));
     	su.sort();
     	assertEquals(Arrays.asList(2), ts.nextTuple());
     }



More information about the teiid-commits mailing list