[teiid-commits] teiid SVN: r4565 - in branches/7.7.x/engine/src: test/java/org/teiid/query/processor/relational and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Tue May 14 11:12:39 EDT 2013


Author: jolee
Date: 2013-05-14 11:12:39 -0400 (Tue, 14 May 2013)
New Revision: 4565

Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
Log:
TEIID-2363:  proactive buffering not occurring for the inner side of an outer join on "MERGE JOIN (SORT/ALREADY_SORTED)"

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-05-14 14:53:25 UTC (rev 4564)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/JoinNode.java	2013-05-14 15:12:39 UTC (rev 4565)
@@ -170,63 +170,61 @@
 		return clonedNode;
 	}
 
-	/**
-	 * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
-	 * @since 4.2
-	 */
-	protected TupleBatch nextBatchDirect() throws BlockedException,
-			TeiidComponentException, TeiidProcessingException {
-		try {
-			if (state == State.LOAD_LEFT) {
-				if (this.joinType != JoinType.JOIN_FULL_OUTER) {
-					this.joinStrategy.leftSource
-							.setImplicitBuffer(ImplicitBuffer.NONE);
-				}
-				// left child was already opened by the join node
-				this.joinStrategy.loadLeft();
-				if (isDependent()) {
-					TupleBuffer buffer = this.joinStrategy.leftSource
-							.getTupleBuffer();
-					// the tuplebuffer may be from a lower node, so pass in the
-					// schema
-					dvs = new DependentValueSource(buffer);
-					dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
-					this.getContext().getVariableContext()
-							.setGlobalValue(this.dependentValueSource, dvs);
-				}
-				state = State.LOAD_RIGHT;
-			}
-		} catch (BlockedException e) {
-			if (!isDependent()) {
-				this.joinStrategy.openRight();
-				this.joinStrategy.loadRight();
-			}
-			throw e;
-		}
-		try {
-			if (state == State.LOAD_RIGHT) {
-				this.joinStrategy.openRight();
-				this.joinStrategy.loadRight();
-				state = State.EXECUTE;
-			}
-			this.joinStrategy.process();
-			this.terminateBatches();
-		} catch (BatchAvailableException e) {
-			// pull the batch
-		} catch (BlockedException e) {
-			// TODO: this leads to duplicate exceptions, we
-			// could track which side is blocking
-			try {
-				this.joinStrategy.leftSource.prefetch(true);
-			} catch (BlockedException e1) {
+    /** 
+     * @see org.teiid.query.processor.relational.RelationalNode#nextBatchDirect()
+     * @since 4.2
+     */
+    protected TupleBatch nextBatchDirect() throws BlockedException,
+                                          TeiidComponentException,
+                                          TeiidProcessingException {
+    	try {
+	    	if (state == State.LOAD_LEFT) {
+	        	if (this.joinType != JoinType.JOIN_FULL_OUTER) {
+	            	this.joinStrategy.leftSource.setImplicitBuffer(ImplicitBuffer.NONE);
+	            }
+	        	//left child was already opened by the join node
+	            this.joinStrategy.loadLeft();
+	            if (isDependent()) { 
+	                TupleBuffer buffer = this.joinStrategy.leftSource.getTupleBuffer();
+	                //the tuplebuffer may be from a lower node, so pass in the schema
+	                dvs = new DependentValueSource(buffer);
+	                dvs.setDistinct(this.joinStrategy.leftSource.isDistinct());
+	                this.getContext().getVariableContext().setGlobalValue(this.dependentValueSource, dvs);
+	            }
+	            state = State.LOAD_RIGHT;
+	        }
+    	} catch (BlockedException e) {
+    		if (!isDependent()) {
+    			this.joinStrategy.openRight();
+                this.joinStrategy.loadRight();
+                this.joinStrategy.rightSource.prefetch(true);
+    		}
+    		throw e;
+    	}
+        try {
+            if (state == State.LOAD_RIGHT) {
+	        	this.joinStrategy.openRight();
+	            this.joinStrategy.loadRight();
+                state = State.EXECUTE;
+            }
+        	this.joinStrategy.process();
+        	this.terminateBatches();
+        } catch (BatchAvailableException e) {
+        	//pull the batch
+        } catch (BlockedException e) {
+        	//TODO: this leads to duplicate exceptions, we 
+        	//could track which side is blocking
+        	try {
+        		this.joinStrategy.leftSource.prefetch(true);
+        	} catch (BlockedException e1) {
+        		
+        	}
+        	this.joinStrategy.rightSource.prefetch(true);
+        	throw e;
+        }
+        return pullBatch();
+    }
 
-			}
-			this.joinStrategy.rightSource.prefetch(true);
-			throw e;
-		}
-		return pullBatch();
-	}
-
 	/**
 	 * @see org.teiid.query.processor.relational.RelationalNode#getDescriptionProperties()
 	 * @since 4.2

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2013-05-14 14:53:25 UTC (rev 4564)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/relational/TestJoinNode.java	2013-05-14 15:12:39 UTC (rev 4565)
@@ -69,7 +69,7 @@
     
     protected JoinNode join;
     protected JoinStrategy joinStrategy;
-    private RelationalNode leftNode;
+    private BlockingFakeRelationalNode leftNode;
     private RelationalNode rightNode;
     
     private FakeDataManager dataMgr;
@@ -791,6 +791,52 @@
         this.join.setJoinStrategy(joinStrategy);
         helpTestJoinDirect(expected, 40, 1);
     }
+    
+    @Test public void testMergeJoinPrefetchAlreadySorted() throws Exception {
+        this.joinType = JoinType.JOIN_INNER;
+        int rows = 50;
+        List[] data = new List[rows];
+        for(int i=0; i<rows; i++) { 
+            data[i] = new ArrayList();
+            Integer value = new Integer((i*17) % 47);
+            data[i].add(value);
+        }
+        this.leftTuples = data;
+        this.rightTuples = new List[] {
+            Arrays.asList(1),  
+            Arrays.asList(2),
+            Arrays.asList(4),
+            Arrays.asList(6),
+            Arrays.asList(7),
+            Arrays.asList(8),
+        };
+        expected = new List[] {
+           Arrays.asList(new Object[] { 1, 1 }),
+           Arrays.asList(new Object[] { 2, 2 }),
+    	   Arrays.asList(new Object[] { 4, 4 }),
+           Arrays.asList(new Object[] { 6, 6 }),
+           Arrays.asList(new Object[] { 7, 7 }),
+           Arrays.asList(new Object[] { 8, 8 }),
+        };
+        helpCreateJoin();               
+        this.joinStrategy = new MergeJoinStrategy(SortOption.SORT, SortOption.ALREADY_SORTED, false);
+        FakeRelationalNode newNode = new FakeRelationalNode(2, rightTuples) {
+        	@Override
+        	public TupleBatch nextBatchDirect() throws BlockedException,
+        			TeiidComponentException, TeiidProcessingException {
+        		TupleBatch tb = super.nextBatchDirect();
+        		if (tb.getTerminationFlag()) {
+        			assertFalse(leftNode.isClosed());
+        		}
+        		return tb;
+        	}
+        };
+        newNode.setElements(rightNode.getElements());
+        rightNode = newNode;
+        
+        this.join.setJoinStrategy(joinStrategy);
+        helpTestJoinDirect(expected, 5, 1);
+    }
 
     @Test public void testRepeatedMerge() throws Exception {
     	helpTestRepeatedMerge(false);



More information about the teiid-commits mailing list