[teiid-commits] teiid SVN: r4337 - in trunk: api/src/main/java/org/teiid/util and 7 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Aug 17 13:52:58 EDT 2012


Author: shawkins
Date: 2012-08-17 13:52:57 -0400 (Fri, 17 Aug 2012)
New Revision: 4337

Added:
   trunk/api/src/main/java/org/teiid/util/
   trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java
   trunk/api/src/main/java/org/teiid/util/XMLInputStream.java
   trunk/api/src/test/java/org/teiid/util/
   trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java
Modified:
   trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java
   trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java
   trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java
   trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
Log:
TEIID-2148 expanding on stream handling

Added: trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java
===================================================================
--- trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java	                        (rev 0)
+++ trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.sql.SQLException;
+
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.Source;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.stax.StAXSource;
+
+import org.teiid.core.types.SQLXMLImpl;
+import org.teiid.core.types.StandardXMLTranslator;
+
+/**
+ * NOTE that this representation of XML does become unreadable after a read operation.
+ */
+public class StAXSQLXML extends SQLXMLImpl {
+	private StAXSource source;
+	
+	public StAXSQLXML(StAXSource source) {
+		this.source = source;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <T extends Source> T getSource(Class<T> sourceClass) throws SQLException {
+		if (sourceClass == null || sourceClass == StAXSource.class) {
+			if (source == null) {
+				throw new SQLException("Already Freed"); //$NON-NLS-1$
+			}
+			StAXSource result = source;
+			source = null;
+			return (T) result;
+		}
+		return super.getSource(sourceClass);
+	}
+	
+	@Override
+	public String getString() throws SQLException {
+		StringWriter sw = new StringWriter();
+		try {
+			new StandardXMLTranslator(getSource(StAXSource.class)).translate(sw);
+		} catch (TransformerException e) {
+			throw new SQLException(e);
+		} catch (IOException e) {
+			throw new SQLException(e);
+		}
+		return sw.toString();
+	}
+	
+	@Override
+	public InputStream getBinaryStream() throws SQLException {
+		try {
+			return new XMLInputStream(getSource(StAXSource.class), XMLOutputFactory.newFactory());
+		} catch (XMLStreamException e) {
+			throw new SQLException(e);
+		} catch (FactoryConfigurationError e) {
+			throw new SQLException(e);
+		}
+	}
+	
+}
\ No newline at end of file


Property changes on: trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/api/src/main/java/org/teiid/util/XMLInputStream.java
===================================================================
--- trunk/api/src/main/java/org/teiid/util/XMLInputStream.java	                        (rev 0)
+++ trunk/api/src/main/java/org/teiid/util/XMLInputStream.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLEventWriter;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.XMLEvent;
+import javax.xml.transform.stax.StAXSource;
+
+import org.teiid.core.types.XMLType;
+import org.teiid.core.util.AccessibleByteArrayOutputStream;
+
+public class XMLInputStream extends InputStream {
+	private static final int BUFFER_SIZE = 1<<13;
+	private int pos = 0;
+	private AccessibleByteArrayOutputStream baos = new AccessibleByteArrayOutputStream(BUFFER_SIZE);
+	private XMLEventReader reader;
+	private XMLEventWriter writer;
+	
+	public XMLInputStream(StAXSource source, XMLOutputFactory outFactory) throws XMLStreamException {
+		reader = source.getXMLEventReader();
+		if (reader == null) {
+			this.reader = XMLType.getXmlInputFactory().createXMLEventReader(source.getXMLStreamReader());
+		}
+		this.writer = outFactory.createXMLEventWriter(baos);
+	}
+	
+	@Override
+	public int read() throws IOException {
+		while (pos >= baos.getCount()) {
+			if (!reader.hasNext()) {
+				return -1;
+			}
+			if (baos.getCount() > BUFFER_SIZE) {
+				baos.setCount(0);
+				pos = 0;
+			}
+			try {
+				XMLEvent event = reader.nextEvent();
+				writer.add(event);
+				writer.flush();
+			} catch (XMLStreamException e) {
+				throw new IOException(e);
+			}
+		}
+		return baos.getBuffer()[pos++];
+	}
+	
+	@Override
+	public void close() throws IOException {
+		try {
+			reader.close();
+		} catch (XMLStreamException e) {
+			throw new IOException(e);
+		}
+	}
+
+}
\ No newline at end of file


Property changes on: trunk/api/src/main/java/org/teiid/util/XMLInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java
===================================================================
--- trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java	                        (rev 0)
+++ trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.util;
+
+import static org.junit.Assert.*;
+
+import java.io.StringReader;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.transform.stax.StAXSource;
+
+import org.junit.Test;
+import org.teiid.core.types.XMLType;
+import org.teiid.core.util.ObjectConverterUtil;
+
+ at SuppressWarnings("nls")
+public class TestXMLInputStream {
+	
+	@Test public void testStreaming() throws Exception {
+		StringBuilder xmlBuilder = new StringBuilder();
+		xmlBuilder.append("<?xml version=\"1.0\"?><root>");
+		for (int i = 0; i < 1000; i++) {
+			xmlBuilder.append("<a></a>");
+			xmlBuilder.append("<b></b>");
+		}
+		xmlBuilder.append("</root>");
+		String xml = xmlBuilder.toString();
+		
+		StAXSource source = new StAXSource(XMLType.getXmlInputFactory().createXMLEventReader(new StringReader(xml)));
+		XMLInputStream is = new XMLInputStream(source, XMLOutputFactory.newFactory());
+		byte[] bytes = ObjectConverterUtil.convertToByteArray(is);
+		assertEquals(xml, new String(bytes, "UTF-8"));
+	}
+
+}


Property changes on: trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java	2012-08-16 17:50:14 UTC (rev 4336)
+++ trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -42,4 +42,8 @@
 		return this.count;
 	}
 	
+	public void setCount(int count) {
+		this.count = count;
+	}
+	
 }
\ No newline at end of file

Modified: trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java
===================================================================
--- trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java	2012-08-16 17:50:14 UTC (rev 4336)
+++ trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -22,8 +22,11 @@
 
 package org.teiid.translator.ws;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.sql.Blob;
 import java.sql.Clob;
+import java.sql.SQLException;
 import java.sql.SQLXML;
 import java.util.Arrays;
 import java.util.List;
@@ -35,6 +38,8 @@
 import javax.xml.ws.handler.MessageContext;
 import javax.xml.ws.http.HTTPBinding;
 
+import org.teiid.core.types.BlobImpl;
+import org.teiid.core.types.BlobType;
 import org.teiid.core.types.ClobImpl;
 import org.teiid.core.types.InputStreamFactory;
 import org.teiid.language.Argument;
@@ -51,6 +56,24 @@
  */
 public class BinaryWSProcedureExecution implements ProcedureExecution {
 	
+	private static final class StreamingBlob extends BlobImpl {
+		InputStream is;
+		
+		public StreamingBlob(InputStream is) {
+			this.is = is;
+		}
+		
+		@Override
+		public InputStream getBinaryStream() throws SQLException {
+			if (is == null) {
+				throw new SQLException("Already Freed."); //$NON-NLS-1$
+			}
+			InputStream result = is;
+			is = null;
+			return result;
+		}
+	}
+
 	RuntimeMetadata metadata;
     ExecutionContext context;
     private Call procedure;
@@ -111,7 +134,15 @@
     
     @Override
     public List<?> getOutputParameterValues() throws TranslatorException {
-        return Arrays.asList(returnValue, returnValue.getContentType());
+    	Object result = returnValue;
+		if (returnValue != null && Boolean.TRUE.equals(procedure.getArguments().get(3).getArgumentValue().getValue())) {
+			try {
+				result = new BlobType(new StreamingBlob(returnValue.getInputStream()));
+			} catch (IOException e) {
+				throw new TranslatorException(e);
+			}
+		}
+        return Arrays.asList(result, returnValue.getContentType());
     }    
     
     public void close() {

Modified: trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java
===================================================================
--- trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java	2012-08-16 17:50:14 UTC (rev 4336)
+++ trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -124,6 +124,8 @@
 			WSConnection conn) throws TranslatorException {
 		Procedure p = metadataFactory.addProcedure("invoke"); //$NON-NLS-1$ 
 		p.setAnnotation("Invokes a webservice that returns an XML result"); //$NON-NLS-1$
+		
+		metadataFactory.addProcedureParameter("result", TypeFacility.RUNTIME_NAMES.XML, Type.ReturnValue, p); //$NON-NLS-1$
 
 		ProcedureParameter param = metadataFactory.addProcedureParameter("binding", TypeFacility.RUNTIME_NAMES.STRING, Type.In, p); //$NON-NLS-1$
 		param.setAnnotation("The invocation binding (HTTP, SOAP11, SOAP12).  May be set or allowed to default to null to use the default binding."); //$NON-NLS-1$
@@ -142,10 +144,15 @@
 		param.setAnnotation("The relative or abolute endpoint to use.  May be set or allowed to default to null to use the default endpoint address."); //$NON-NLS-1$
 		param.setNullType(NullType.Nullable);
 		
-		metadataFactory.addProcedureParameter("result", TypeFacility.RUNTIME_NAMES.XML, Type.ReturnValue, p); //$NON-NLS-1$
+		param = metadataFactory.addProcedureParameter("stream", TypeFacility.RUNTIME_NAMES.BOOLEAN, Type.In, p); //$NON-NLS-1$
+		param.setAnnotation("If the result should be streamed."); //$NON-NLS-1$
+		param.setNullType(NullType.Nullable);
+		param.setDefaultValue("false"); //$NON-NLS-1$
 		
 		p = metadataFactory.addProcedure(INVOKE_HTTP);
 		p.setAnnotation("Invokes a webservice that returns an binary result"); //$NON-NLS-1$
+		
+		metadataFactory.addProcedureParameter("result", TypeFacility.RUNTIME_NAMES.BLOB, Type.ReturnValue, p); //$NON-NLS-1$
 
 		param = metadataFactory.addProcedureParameter("action", TypeFacility.RUNTIME_NAMES.STRING, Type.In, p); //$NON-NLS-1$
 		param.setAnnotation("Sets the HTTP Method (GET, POST - default, etc.)."); //$NON-NLS-1$
@@ -160,7 +167,11 @@
 		param.setAnnotation("The relative or abolute endpoint to use.  May be set or allowed to default to null to use the default endpoint address."); //$NON-NLS-1$
 		param.setNullType(NullType.Nullable);
 		
-		metadataFactory.addProcedureParameter("result", TypeFacility.RUNTIME_NAMES.BLOB, Type.ReturnValue, p); //$NON-NLS-1$
+		param = metadataFactory.addProcedureParameter("stream", TypeFacility.RUNTIME_NAMES.BOOLEAN, Type.In, p); //$NON-NLS-1$
+		param.setAnnotation("If the result should be streamed."); //$NON-NLS-1$
+		param.setNullType(NullType.Nullable);
+		param.setDefaultValue("false"); //$NON-NLS-1$
+		
 		metadataFactory.addProcedureParameter("contentType", TypeFacility.RUNTIME_NAMES.STRING, Type.Out, p); //$NON-NLS-1$	
 	}
 	

Modified: trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java
===================================================================
--- trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java	2012-08-16 17:50:14 UTC (rev 4336)
+++ trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -29,17 +29,19 @@
 import java.util.Arrays;
 import java.util.List;
 
-import javax.xml.transform.Source;
+import javax.xml.stream.XMLStreamException;
 import javax.xml.transform.Transformer;
 import javax.xml.transform.TransformerException;
 import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stax.StAXSource;
 import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
 import javax.xml.ws.Dispatch;
 import javax.xml.ws.WebServiceException;
 import javax.xml.ws.handler.MessageContext;
 
+import org.teiid.core.types.SQLXMLImpl;
 import org.teiid.core.types.XMLType;
+import org.teiid.core.types.XMLType.Type;
 import org.teiid.language.Argument;
 import org.teiid.language.Call;
 import org.teiid.metadata.RuntimeMetadata;
@@ -50,16 +52,17 @@
 import org.teiid.translator.WSConnection;
 import org.teiid.translator.WSConnection.Util;
 import org.teiid.translator.ws.WSExecutionFactory.Binding;
+import org.teiid.util.StAXSQLXML;
 
 /**
  * A soap call executor - handles all styles doc/literal, rpc/encoded etc. 
  */
 public class WSProcedureExecution implements ProcedureExecution {
-	
+
 	RuntimeMetadata metadata;
     ExecutionContext context;
     private Call procedure;
-    private Source returnValue;
+    private StAXSource returnValue;
     private WSConnection conn;
     private WSExecutionFactory executionFactory;
     
@@ -80,7 +83,7 @@
         String style = (String)arguments.get(0).getArgumentValue().getValue();
         String action = (String)arguments.get(1).getArgumentValue().getValue();
         XMLType docObject = (XMLType)arguments.get(2).getArgumentValue().getValue();
-        StreamSource source = null;
+        StAXSource source = null;
     	try {
 	        source = convertToSource(docObject);
 	        String endpoint = (String)arguments.get(3).getArgumentValue().getValue();
@@ -96,7 +99,7 @@
 	        	}
 	        }
 	        
-	        Dispatch<StreamSource> dispatch = conn.createDispatch(style, endpoint, StreamSource.class, executionFactory.getDefaultServiceMode()); 
+	        Dispatch<StAXSource> dispatch = conn.createDispatch(style, endpoint, StAXSource.class, executionFactory.getDefaultServiceMode()); 
 	
 			if (Binding.HTTP.getBindingId().equals(style)) {
 				if (action == null) {
@@ -127,23 +130,25 @@
 			
 			if (source == null) {
 				// JBoss Native DispatchImpl throws exception when the source is null
-				source = new StreamSource(new StringReader("<none/>")); //$NON-NLS-1$
+				source = new StAXSource(XMLType.getXmlInputFactory().createXMLEventReader(new StringReader("<none/>"))); //$NON-NLS-1$
 			}
 			this.returnValue = dispatch.invoke(source);
 		} catch (SQLException e) {
 			throw new TranslatorException(e);
 		} catch (WebServiceException e) {
 			throw new TranslatorException(e);
+		} catch (XMLStreamException e) {
+			throw new TranslatorException(e);
 		} finally {
 			Util.closeSource(source);
 		}
     }
 
-	private StreamSource convertToSource(SQLXML xml) throws SQLException {
+	private StAXSource convertToSource(SQLXML xml) throws SQLException {
 		if (xml == null) {
 			return null;
 		}
-		return xml.getSource(StreamSource.class);
+		return xml.getSource(StAXSource.class);
 	}
     
     @Override
@@ -153,7 +158,14 @@
     
     @Override
     public List<?> getOutputParameterValues() throws TranslatorException {
-        return Arrays.asList(returnValue);
+    	Object result = returnValue;
+		if (returnValue != null && Boolean.TRUE.equals(procedure.getArguments().get(4).getArgumentValue().getValue())) {
+			SQLXMLImpl sqlXml = new StAXSQLXML(returnValue);
+			XMLType xml = new XMLType(sqlXml);
+			xml.setType(Type.DOCUMENT);
+			result = xml;
+		}
+        return Arrays.asList(result);
     }    
     
     public void close() {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-16 17:50:14 UTC (rev 4336)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -34,11 +34,14 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.activation.DataSource;
+import javax.xml.stream.XMLStreamException;
 import javax.xml.transform.Source;
+import javax.xml.transform.stax.StAXSource;
 import javax.xml.transform.stream.StreamSource;
 
 import org.teiid.client.SourceWarning;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.FileStoreInputStreamFactory;
 import org.teiid.common.buffer.TupleSource;
@@ -74,6 +77,7 @@
 import org.teiid.translator.DataNotAvailableException;
 import org.teiid.translator.TranslatorException;
 import org.teiid.translator.CacheDirective.Scope;
+import org.teiid.util.XMLInputStream;
 
 
 /**
@@ -86,7 +90,7 @@
  */
 public class DataTierTupleSource implements TupleSource, CompletionListener<AtomicResultsMessage> {
 	
-    private static final class MoreWorkTask implements Runnable {
+	private static final class MoreWorkTask implements Runnable {
 
 		WeakReference<RequestWorkItem> ref;
 
@@ -183,7 +187,7 @@
 				continue;
 			}
 			if (convertToRuntimeType[i]) {
-				Object result = convertToRuntimeType(value, this.schema[i]);
+				Object result = convertToRuntimeType(dtm.getBufferManager(), value, this.schema[i]);
 				if (value == result && !DataTypeManager.DefaultDataClasses.OBJECT.equals(this.schema[i])) {
 					convertToRuntimeType[i] = false;
 				} else {
@@ -216,12 +220,12 @@
 		return row;
 	}
 
-	private Object convertToRuntimeType(Object value, Class<?> desiredType) throws TransformationException {
+	static Object convertToRuntimeType(BufferManager bm, Object value, Class<?> desiredType) throws TransformationException {
 		if (value instanceof DataSource && (!(value instanceof Source) || desiredType != DataTypeManager.DefaultDataClasses.XML)) {
 			if (value instanceof InputStreamFactory) {
 				return new BlobType(new BlobImpl((InputStreamFactory)value));
 			}
-			FileStore fs = dtm.getBufferManager().createFileStore("bytes"); //$NON-NLS-1$
+			FileStore fs = bm.createFileStore("bytes"); //$NON-NLS-1$
 			//TODO: guess at the encoding from the content type
 			FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
 
@@ -241,20 +245,31 @@
 					if (is == null && r != null) {
 						is = new ReaderInputStream(r, Streamable.CHARSET);
 					}
-					final FileStore fs = dtm.getBufferManager().createFileStore("xml"); //$NON-NLS-1$
+					final FileStore fs = bm.createFileStore("xml"); //$NON-NLS-1$
 					final FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
 
 					value = new SaveOnReadInputStream(is, fsisf).getInputStreamFactory();
+				} else if (value instanceof StAXSource) {
+					//TODO: do this lazily.  if the first access to get the STaXSource, then 
+					//it's more efficient to let the processing happen against STaX
+					StAXSource ss = (StAXSource)value;
+					try {
+						final FileStore fs = bm.createFileStore("xml"); //$NON-NLS-1$
+						final FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+						value = new SaveOnReadInputStream(new XMLInputStream(ss, XMLSystemFunctions.getOutputFactory()), fsisf).getInputStreamFactory();
+					} catch (XMLStreamException e) {
+						throw new TransformationException(e);
+					}
 				} else {
 					//maybe dom or some other source we want to get out of memory
 					StandardXMLTranslator sxt = new StandardXMLTranslator((Source)value);
 					SQLXMLImpl sqlxml;
 					try {
-						sqlxml = XMLSystemFunctions.saveToBufferManager(dtm.getBufferManager(), sxt);
+						sqlxml = XMLSystemFunctions.saveToBufferManager(bm, sxt);
 					} catch (TeiidComponentException e) {
-						 throw new TeiidRuntimeException(e);
+						 throw new TransformationException(e);
 					} catch (TeiidProcessingException e) {
-						 throw new TeiidRuntimeException(e);
+						 throw new TransformationException(e);
 					}
 					return new XMLType(sqlxml);	
 				}

Modified: trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java	2012-08-16 17:50:14 UTC (rev 4336)
+++ trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -93,12 +93,12 @@
 import org.teiid.core.types.XMLTranslator;
 import org.teiid.core.types.XMLType;
 import org.teiid.core.types.XMLType.Type;
-import org.teiid.jdbc.TeiidSQLException;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.eval.Evaluator;
 import org.teiid.query.function.CharsetUtils;
 import org.teiid.query.util.CommandContext;
 import org.teiid.translator.WSConnection.Util;
+import org.teiid.util.StAXSQLXML;
 
 
 /** 
@@ -855,47 +855,41 @@
     	JSONParser parser = new JSONParser();
     	final JsonToXmlContentHandler reader = new JsonToXmlContentHandler(rootName, r, parser, threadLocalEventtFactory.get());
 
+    	SQLXMLImpl sqlXml = null;
 		if (stream) {
-			//jre 1.7 event logic does not set a dummy location and throws an NPE in StAXSource, so we explicitly set a location
-			reader.eventFactory.setLocation(dummyLocation);
-			return new SQLXMLImpl() {
-				@SuppressWarnings("unchecked")
-				public <T extends Source> T getSource(Class<T> sourceClass) throws SQLException {
-					if (sourceClass == null || sourceClass == StAXSource.class) {
-						StAXSource source;
-						try {
-							source = new StAXSource(reader);
-						} catch (XMLStreamException e) {
-							throw TeiidSQLException.create(e);
-						}
-						return (T) source;
-					}
-					throw new AssertionError("unsupported source type"); //$NON-NLS-1$
+			try {
+				//jre 1.7 event logic does not set a dummy location and throws an NPE in StAXSource, so we explicitly set a location
+				//the streaming result will be directly consumed, so there's no danger that we're stepping on another location
+				reader.eventFactory.setLocation(dummyLocation);
+				sqlXml = new StAXSQLXML(new StAXSource(reader));
+			} catch (XMLStreamException e) {
+				throw new TeiidProcessingException(e);
+			}
+		} else {
+			sqlXml = XMLSystemFunctions.saveToBufferManager(context.getBufferManager(), new XMLTranslator() {
+				
+				@Override
+				public void translate(Writer writer) throws TransformerException,
+						IOException {
+			    	try {
+						XMLOutputFactory factory = getOutputFactory();
+						final XMLEventWriter streamWriter = factory.createXMLEventWriter(writer);
+	
+				    	streamWriter.add(reader);
+						streamWriter.flush(); //woodstox needs a flush rather than a close
+					} catch (XMLStreamException e) {
+						throw new TransformerException(e);
+					} finally {
+			    		try {
+		    				r.close();
+			    		} catch (IOException e) {
+			    			
+			    		}
+			    	}
 				}
-			};
+			});
 		}
-		XMLType result = new XMLType(XMLSystemFunctions.saveToBufferManager(context.getBufferManager(), new XMLTranslator() {
-			
-			@Override
-			public void translate(Writer writer) throws TransformerException,
-					IOException {
-		    	try {
-					XMLOutputFactory factory = getOutputFactory();
-					final XMLEventWriter streamWriter = factory.createXMLEventWriter(writer);
-
-			    	streamWriter.add(reader);
-					streamWriter.flush(); //woodstox needs a flush rather than a close
-				} catch (XMLStreamException e) {
-					throw new TransformerException(e);
-				} finally {
-		    		try {
-	    				r.close();
-		    		} catch (IOException e) {
-		    			
-		    		}
-		    	}
-			}
-		}));
+		XMLType result = new XMLType(sqlXml);
 		result.setType(Type.DOCUMENT);
 		return result;
 	}

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-08-16 17:50:14 UTC (rev 4336)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java	2012-08-17 17:52:57 UTC (rev 4337)
@@ -24,8 +24,12 @@
 
 import static org.junit.Assert.*;
 
+import java.io.StringReader;
 import java.util.List;
 
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamSource;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -35,12 +39,19 @@
 import org.teiid.client.RequestMessage;
 import org.teiid.client.SourceWarning;
 import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.common.buffer.TupleSource;
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.types.BlobType;
 import org.teiid.core.types.ClobType;
+import org.teiid.core.types.DataTypeManager;
 import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.Streamable;
+import org.teiid.core.types.XMLType;
 import org.teiid.core.types.InputStreamFactory.StorageMode;
+import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
 import org.teiid.dqp.internal.datamgr.FakeTransactionService;
 import org.teiid.dqp.message.AtomicRequestMessage;
@@ -87,14 +98,13 @@
     
     private DataTierTupleSource helpSetup(String sql, int nodeId) throws Exception {
         helpSetupDataTierManager();
-        AtomicRequestMessage request = helpSetupRequest(sql, nodeId);
+        AtomicRequestMessage request = helpSetupRequest(sql, nodeId, RealMetadataFactory.exampleBQTCached());
         request.setSerial(serial);
         return new DataTierTupleSource(request, workItem, connectorManager.registerRequest(request), dtm, limit);
     }
 
-	private AtomicRequestMessage helpSetupRequest(String sql, int nodeId) throws Exception {
-		QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
-        DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata, vdb);
+	private AtomicRequestMessage helpSetupRequest(String sql, int nodeId, QueryMetadataInterface metadata) throws Exception {
+		DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata, vdb);
         
         Command command = helpGetCommand(sql, metadata);
         
@@ -293,10 +303,12 @@
     @Test public void testCaching() throws Exception {
     	assertEquals(0, connectorManager.getExecuteCount().get());
 
+    	QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
+    	
     	CacheDirective cd = new CacheDirective();
     	this.connectorManager.cacheDirective = cd;
     	helpSetupDataTierManager();
-    	Command command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1).getCommand();
+    	Command command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1, metadata).getCommand();
     	RegisterRequestParameter rrp = new RegisterRequestParameter();
     	rrp.connectorBindingId = "x";
     	TupleSource ts = dtm.registerRequest(context, command, "foo", rrp);
@@ -311,7 +323,7 @@
     	assertEquals(1, this.rm.getRsCache().getTotalCacheEntries());
     	
     	//same session, should be cached
-    	command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1).getCommand();
+    	command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1, metadata).getCommand();
     	rrp = new RegisterRequestParameter();
     	rrp.connectorBindingId = "x";
     	ts = dtm.registerRequest(context, command, "foo", rrp);
@@ -321,7 +333,7 @@
     	assertTrue(rrp.doNotCache);
     	
     	//switch sessions
-    	command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1).getCommand();
+    	command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1, metadata).getCommand();
     	this.context.getSession().setSessionId("different");
     	rrp = new RegisterRequestParameter();
     	rrp.connectorBindingId = "x";
@@ -338,4 +350,24 @@
     	assertEquals(2, this.rm.getRsCache().getTotalCacheEntries());
     }
     
+    @Test public void testTypeConversion() throws Exception {
+    	BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+    	
+    	String str = "hello world";
+    	
+    	Object source = new StreamSource(new StringReader(str));
+    	XMLType xml = (XMLType) DataTierTupleSource.convertToRuntimeType(bm, source, DataTypeManager.DefaultDataClasses.XML);
+    	assertEquals(str, xml.getString());
+    	
+    	source = new StAXSource(XMLType.getXmlInputFactory().createXMLEventReader(new StringReader("<a/>")));
+    	xml = (XMLType) DataTierTupleSource.convertToRuntimeType(bm, source, DataTypeManager.DefaultDataClasses.XML);
+    	assertEquals("<?xml version=\"1.0\"?><a></a>", xml.getString());
+    	
+    	byte[] bytes = str.getBytes(Streamable.ENCODING);
+		source = new InputStreamFactory.BlobInputStreamFactory(BlobType.createBlob(bytes));
+    	BlobType blob = (BlobType) DataTierTupleSource.convertToRuntimeType(bm, source, DataTypeManager.DefaultDataClasses.BLOB);
+    	
+    	assertArrayEquals(bytes, ObjectConverterUtil.convertToByteArray(blob.getBinaryStream()));
+    }
+    
 }



More information about the teiid-commits mailing list