[teiid-commits] teiid SVN: r4573 - in branches/7.7.x/engine/src: main/java/org/teiid/query/processor/relational and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu May 30 23:45:35 EDT 2013


Author: jolee
Date: 2013-05-30 23:45:35 -0400 (Thu, 30 May 2013)
New Revision: 4573

Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
   branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java
Log:
Timing issue when processing data from an XML stream

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2013-05-23 19:56:46 UTC (rev 4572)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2013-05-31 03:45:35 UTC (rev 4573)
@@ -30,6 +30,7 @@
 import org.teiid.client.ResizingArrayList;
 import org.teiid.common.buffer.LobManager.ReferenceMode;
 import org.teiid.core.TeiidComponentException;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.types.Streamable;
 import org.teiid.core.util.Assertion;
@@ -42,6 +43,73 @@
 
 public class TupleBuffer {
 	
+	public interface RowSource {
+		void finalRow() throws TeiidComponentException, TeiidProcessingException;
+	}
+	
+	public final class TupleBufferTupleSource extends
+			AbstractTupleSource {
+		private final boolean singleUse;
+		private boolean noBlocking;
+		private boolean reverse;
+
+		private TupleBufferTupleSource(boolean singleUse) {
+			this.singleUse = singleUse;
+		}
+
+		@Override
+		protected List<?> finalRow() throws TeiidComponentException, TeiidProcessingException {
+			if(isFinal || noBlocking || reverse) {
+		        return null;
+		    } 
+			if (rowSourceLock == null) { 
+				throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
+			}
+			synchronized (rowSourceLock) {
+				rowSourceLock.finalRow();
+				return getCurrentTuple();
+			}
+		}
+
+		@Override
+		public int available() {
+			if (!reverse) {
+				return rowCount - getCurrentIndex() + 1;
+			}
+			return getCurrentIndex();
+		}
+
+		@Override
+		protected TupleBatch getBatch(int row) throws TeiidComponentException {
+			return TupleBuffer.this.getBatch(row);
+		}
+
+		@Override
+		public void closeSource() {
+			super.closeSource();
+			if (singleUse) {
+				remove();
+			}
+		}
+		
+		public void setNoBlocking(boolean noBlocking) {
+			this.noBlocking = noBlocking;
+		}
+		
+		public void setReverse(boolean reverse) {
+			this.reverse = reverse;
+		}
+		
+		@Override
+		public int getCurrentIndex() {
+			if (!reverse) {
+				return super.getCurrentIndex();
+			}
+			return getRowCount() - super.getCurrentIndex() + 1;
+		}
+		
+	}
+
 	/**
      * Gets the data type names for each of the input expressions, in order.
      * @param expressions List of Expressions
@@ -75,6 +143,7 @@
 
 	private LobManager lobManager;
 	private String uuid;
+	private RowSource rowSourceLock;
 	
 	public TupleBuffer(BatchManager manager, String id, List<? extends Expression> schema, LobManager lobManager, int batchSize) {
 		this.manager = manager;
@@ -84,6 +153,10 @@
 		this.batchSize = batchSize;		
 	}
 	
+	public void setRowSourceLock(RowSource rowSourceLock) {
+		this.rowSourceLock = rowSourceLock;
+	}
+	
 	public void setInlineLobs(boolean inline) {
 		if (this.lobManager != null) {
 			this.lobManager.setInlineLobs(inline);
@@ -311,34 +384,7 @@
 		if (singleUse) {
 			setForwardOnly(true);
 		}
-		return new AbstractTupleSource() {
-			
-			@Override
-			protected List<?> finalRow() throws BlockedException {
-				if(isFinal) {
-		            return null;
-		        } 
-		        throw BlockedException.blockWithTrace("Blocking on non-final TupleBuffer", tupleSourceID, "size", getRowCount()); //$NON-NLS-1$ //$NON-NLS-2$
-			}
-			
-			@Override
-			public int available() {
-				return rowCount - getCurrentIndex() + 1;
-			}
-			
-			@Override
-			protected TupleBatch getBatch(int row) throws TeiidComponentException {
-				return TupleBuffer.this.getBatch(row);
-			}
-			
-			@Override
-			public void closeSource() {
-				super.closeSource();
-				if (singleUse) {
-					remove();
-				}
-			}
-		};
+		return new TupleBufferTupleSource(singleUse);
 	}
 	
 	@Override

Modified: branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2013-05-23 19:56:46 UTC (rev 4572)
+++ branches/7.7.x/engine/src/main/java/org/teiid/query/processor/relational/XMLTableNode.java	2013-05-31 03:45:35 UTC (rev 4573)
@@ -25,7 +25,6 @@
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -46,9 +45,9 @@
 
 import org.teiid.api.exception.query.ExpressionEvaluationException;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.common.buffer.TupleBatch;
 import org.teiid.common.buffer.TupleBuffer;
-import org.teiid.common.buffer.BufferManager.TupleSourceType;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
@@ -60,9 +59,9 @@
 import org.teiid.query.function.FunctionDescriptor;
 import org.teiid.query.sql.lang.XMLTable;
 import org.teiid.query.sql.lang.XMLTable.XMLColumn;
-import org.teiid.query.xquery.saxon.XQueryEvaluator;
 import org.teiid.query.xquery.saxon.SaxonXQueryExpression.Result;
 import org.teiid.query.xquery.saxon.SaxonXQueryExpression.RowProcessor;
+import org.teiid.query.xquery.saxon.XQueryEvaluator;
 
 /**
  * Handles xml table processing.
@@ -70,7 +69,7 @@
  * When streaming the results will be fully built and stored in a buffer
  * before being returned
  */
-public class XMLTableNode extends SubqueryAwareRelationalNode implements RowProcessor {
+public class XMLTableNode extends SubqueryAwareRelationalNode implements RowProcessor, TupleBuffer.RowSource {
 
 	private static Map<Class<?>, BuiltInAtomicType> typeMapping = new HashMap<Class<?>, BuiltInAtomicType>();
 	
@@ -232,12 +231,20 @@
 						}
 					} finally {
 						synchronized (XMLTableNode.this) {
+							if (buffer != null && asynchException == null) {
+								try {
+									buffer.close();
+								} catch (TeiidComponentException e) {
+									asynchException = new TeiidRuntimeException(e);
+								}
+							}
 							state = State.DONE;
 							XMLTableNode.this.notifyAll();
 						}
 					}
 				}
 			};
+			this.buffer.setRowSourceLock(this);
 			this.getContext().getExecutor().execute(r);
 			return;
 		}
@@ -247,6 +254,21 @@
 			unwrapException(e);
 		}
 	}
+	
+	@Override
+	public synchronized void finalRow() throws TeiidComponentException,
+			TeiidProcessingException {
+		while (state == State.BUILDING) {
+			try {
+				this.wait();
+			} catch (InterruptedException e) {
+				throw new TeiidRuntimeException(e);
+			}
+		}
+		if (this.asynchException != null) {
+			unwrapException(this.asynchException);
+		}
+	}
 
 	private List<?> processRow() throws ExpressionEvaluationException, BlockedException,
 			TeiidComponentException, TeiidProcessingException {

Modified: branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java
===================================================================
--- branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java	2013-05-23 19:56:46 UTC (rev 4572)
+++ branches/7.7.x/engine/src/test/java/org/teiid/query/processor/TestSQLXMLProcessing.java	2013-05-31 03:45:35 UTC (rev 4573)
@@ -33,10 +33,15 @@
 import java.util.List;
 import java.util.TimeZone;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.teiid.api.exception.query.ExpressionEvaluationException;
+import org.teiid.client.util.ResultsFuture;
+import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.BlobImpl;
 import org.teiid.core.types.BlobType;
 import org.teiid.core.types.DataTypeManager;
@@ -53,6 +58,7 @@
 import org.teiid.query.sql.symbol.Expression;
 import org.teiid.query.unittest.RealMetadataFactory;
 import org.teiid.query.unittest.TimestampUtil;
+import org.teiid.query.util.CommandContext;
 
 @SuppressWarnings({"nls", "unchecked"})
 public class TestSQLXMLProcessing {
@@ -357,6 +363,66 @@
         process(sql, expected);
     }
     
+    @Test public void testXmlTableStreamingTiming() throws Throwable {
+    	String sql = "select xmlserialize(x.object_value as string), y.x from xmltable('/a/b' passing xmlparse(document '<a><b x=''1''/><b x=''2''/></a>')) as x, (select 1 as x) as y"; //$NON-NLS-1$
+        
+        final List<?>[] expected = new List<?>[] {
+        		Arrays.asList("<b xmlns=\"\" x=\"1\"/>", 1),
+        		Arrays.asList("<b xmlns=\"\" x=\"2\"/>", 1)
+        };    
+    
+        executeStreaming(sql, expected);
+    }
+    
+    @Test(expected=TeiidProcessingException.class) public void testXmlTableStreamingTimingWithError() throws Throwable {
+    	String sql = "select x.x, y.x from xmltable('/a/b' passing xmlparse(document '<a><b x=''1''/><b x=''2''/></a>') columns x integer path '1 div (@x - 1)') as x, (select 1 as x) as y"; //$NON-NLS-1$
+        
+        final List<?>[] expected = new List<?>[] {
+        		Arrays.asList(1, 1),
+        		Arrays.asList(2, 1)
+        };    
+    
+        executeStreaming(sql, expected);
+    }
+
+	private void executeStreaming(String sql, final List<?>[] expected)
+			throws Throwable {
+		final CommandContext cc = createCommandContext();
+        final ResultsFuture<Runnable> r = new ResultsFuture<Runnable>();
+        Executor ex = new Executor() {
+        	
+			@Override
+			public void execute(Runnable command) {
+				r.getResultsReceiver().receiveResults(command);
+			}
+		};
+        cc.setExecutor(ex);
+		final ProcessorPlan plan = helpGetPlan(helpParse(sql), RealMetadataFactory.example1Cached(), new DefaultCapabilitiesFinder(), cc);
+		final ResultsFuture<Void> result = new ResultsFuture<Void>();
+		Thread t = new Thread() {
+			@Override
+			public void run() {
+				try {
+					doProcess(plan, dataManager, expected, cc);
+					result.getResultsReceiver().receiveResults(null);
+				} catch (Throwable e) {
+					result.getResultsReceiver().exceptionOccurred(e);
+				}
+			}
+		};
+		t.start();
+		Runnable runnable = r.get();
+		runnable.run();
+		try {
+			result.get();
+		} catch (ExecutionException e) {
+			if (e.getCause() != null) {
+				throw e.getCause();
+			}
+			throw e;
+		}
+	}
+    
     @Test public void testXmlNameEscaping() throws Exception {
     	String sql = "select xmlforest(\"xml\") from (select 1 as \"xml\") x"; //$NON-NLS-1$
         



More information about the teiid-commits mailing list