Author: shawkins
Date: 2012-08-17 13:52:57 -0400 (Fri, 17 Aug 2012)
New Revision: 4337
Added:
trunk/api/src/main/java/org/teiid/util/
trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java
trunk/api/src/main/java/org/teiid/util/XMLInputStream.java
trunk/api/src/test/java/org/teiid/util/
trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java
Modified:
trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
Log:
TEIID-2148 expanding on stream handling
Added: trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java
===================================================================
--- trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java (rev
0)
+++ trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java 2012-08-17 17:52:57 UTC (rev
4337)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.sql.SQLException;
+
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.transform.Source;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.stax.StAXSource;
+
+import org.teiid.core.types.SQLXMLImpl;
+import org.teiid.core.types.StandardXMLTranslator;
+
+/**
+ * NOTE that this representation of XML does become unreadable after a read operation.
+ */
+public class StAXSQLXML extends SQLXMLImpl {
+ private StAXSource source;
+
+ public StAXSQLXML(StAXSource source) {
+ this.source = source;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends Source> T getSource(Class<T> sourceClass) throws
SQLException {
+ if (sourceClass == null || sourceClass == StAXSource.class) {
+ if (source == null) {
+ throw new SQLException("Already Freed"); //$NON-NLS-1$
+ }
+ StAXSource result = source;
+ source = null;
+ return (T) result;
+ }
+ return super.getSource(sourceClass);
+ }
+
+ @Override
+ public String getString() throws SQLException {
+ StringWriter sw = new StringWriter();
+ try {
+ new StandardXMLTranslator(getSource(StAXSource.class)).translate(sw);
+ } catch (TransformerException e) {
+ throw new SQLException(e);
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ return sw.toString();
+ }
+
+ @Override
+ public InputStream getBinaryStream() throws SQLException {
+ try {
+ return new XMLInputStream(getSource(StAXSource.class),
XMLOutputFactory.newFactory());
+ } catch (XMLStreamException e) {
+ throw new SQLException(e);
+ } catch (FactoryConfigurationError e) {
+ throw new SQLException(e);
+ }
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/api/src/main/java/org/teiid/util/StAXSQLXML.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/api/src/main/java/org/teiid/util/XMLInputStream.java
===================================================================
--- trunk/api/src/main/java/org/teiid/util/XMLInputStream.java
(rev 0)
+++ trunk/api/src/main/java/org/teiid/util/XMLInputStream.java 2012-08-17 17:52:57 UTC
(rev 4337)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLEventWriter;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.XMLEvent;
+import javax.xml.transform.stax.StAXSource;
+
+import org.teiid.core.types.XMLType;
+import org.teiid.core.util.AccessibleByteArrayOutputStream;
+
+public class XMLInputStream extends InputStream {
+ private static final int BUFFER_SIZE = 1<<13;
+ private int pos = 0;
+ private AccessibleByteArrayOutputStream baos = new
AccessibleByteArrayOutputStream(BUFFER_SIZE);
+ private XMLEventReader reader;
+ private XMLEventWriter writer;
+
+ public XMLInputStream(StAXSource source, XMLOutputFactory outFactory) throws
XMLStreamException {
+ reader = source.getXMLEventReader();
+ if (reader == null) {
+ this.reader =
XMLType.getXmlInputFactory().createXMLEventReader(source.getXMLStreamReader());
+ }
+ this.writer = outFactory.createXMLEventWriter(baos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ while (pos >= baos.getCount()) {
+ if (!reader.hasNext()) {
+ return -1;
+ }
+ if (baos.getCount() > BUFFER_SIZE) {
+ baos.setCount(0);
+ pos = 0;
+ }
+ try {
+ XMLEvent event = reader.nextEvent();
+ writer.add(event);
+ writer.flush();
+ } catch (XMLStreamException e) {
+ throw new IOException(e);
+ }
+ }
+ return baos.getBuffer()[pos++];
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ reader.close();
+ } catch (XMLStreamException e) {
+ throw new IOException(e);
+ }
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/api/src/main/java/org/teiid/util/XMLInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java
===================================================================
--- trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java
(rev 0)
+++ trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java 2012-08-17 17:52:57 UTC
(rev 4337)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.util;
+
+import static org.junit.Assert.*;
+
+import java.io.StringReader;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.transform.stax.StAXSource;
+
+import org.junit.Test;
+import org.teiid.core.types.XMLType;
+import org.teiid.core.util.ObjectConverterUtil;
+
+@SuppressWarnings("nls")
+public class TestXMLInputStream {
+
+ @Test public void testStreaming() throws Exception {
+ StringBuilder xmlBuilder = new StringBuilder();
+ xmlBuilder.append("<?xml version=\"1.0\"?><root>");
+ for (int i = 0; i < 1000; i++) {
+ xmlBuilder.append("<a></a>");
+ xmlBuilder.append("<b></b>");
+ }
+ xmlBuilder.append("</root>");
+ String xml = xmlBuilder.toString();
+
+ StAXSource source = new
StAXSource(XMLType.getXmlInputFactory().createXMLEventReader(new StringReader(xml)));
+ XMLInputStream is = new XMLInputStream(source, XMLOutputFactory.newFactory());
+ byte[] bytes = ObjectConverterUtil.convertToByteArray(is);
+ assertEquals(xml, new String(bytes, "UTF-8"));
+ }
+
+}
Property changes on: trunk/api/src/test/java/org/teiid/util/TestXMLInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java
===================================================================
---
trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java 2012-08-16
17:50:14 UTC (rev 4336)
+++
trunk/common-core/src/main/java/org/teiid/core/util/AccessibleByteArrayOutputStream.java 2012-08-17
17:52:57 UTC (rev 4337)
@@ -42,4 +42,8 @@
return this.count;
}
+ public void setCount(int count) {
+ this.count = count;
+ }
+
}
\ No newline at end of file
Modified:
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java
===================================================================
---
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java 2012-08-16
17:50:14 UTC (rev 4336)
+++
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/BinaryWSProcedureExecution.java 2012-08-17
17:52:57 UTC (rev 4337)
@@ -22,8 +22,11 @@
package org.teiid.translator.ws;
+import java.io.IOException;
+import java.io.InputStream;
import java.sql.Blob;
import java.sql.Clob;
+import java.sql.SQLException;
import java.sql.SQLXML;
import java.util.Arrays;
import java.util.List;
@@ -35,6 +38,8 @@
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.http.HTTPBinding;
+import org.teiid.core.types.BlobImpl;
+import org.teiid.core.types.BlobType;
import org.teiid.core.types.ClobImpl;
import org.teiid.core.types.InputStreamFactory;
import org.teiid.language.Argument;
@@ -51,6 +56,24 @@
*/
public class BinaryWSProcedureExecution implements ProcedureExecution {
+ private static final class StreamingBlob extends BlobImpl {
+ InputStream is;
+
+ public StreamingBlob(InputStream is) {
+ this.is = is;
+ }
+
+ @Override
+ public InputStream getBinaryStream() throws SQLException {
+ if (is == null) {
+ throw new SQLException("Already Freed."); //$NON-NLS-1$
+ }
+ InputStream result = is;
+ is = null;
+ return result;
+ }
+ }
+
RuntimeMetadata metadata;
ExecutionContext context;
private Call procedure;
@@ -111,7 +134,15 @@
@Override
public List<?> getOutputParameterValues() throws TranslatorException {
- return Arrays.asList(returnValue, returnValue.getContentType());
+ Object result = returnValue;
+ if (returnValue != null &&
Boolean.TRUE.equals(procedure.getArguments().get(3).getArgumentValue().getValue())) {
+ try {
+ result = new BlobType(new StreamingBlob(returnValue.getInputStream()));
+ } catch (IOException e) {
+ throw new TranslatorException(e);
+ }
+ }
+ return Arrays.asList(result, returnValue.getContentType());
}
public void close() {
Modified:
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java
===================================================================
---
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java 2012-08-16
17:50:14 UTC (rev 4336)
+++
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSExecutionFactory.java 2012-08-17
17:52:57 UTC (rev 4337)
@@ -124,6 +124,8 @@
WSConnection conn) throws TranslatorException {
Procedure p = metadataFactory.addProcedure("invoke"); //$NON-NLS-1$
p.setAnnotation("Invokes a webservice that returns an XML result");
//$NON-NLS-1$
+
+ metadataFactory.addProcedureParameter("result",
TypeFacility.RUNTIME_NAMES.XML, Type.ReturnValue, p); //$NON-NLS-1$
ProcedureParameter param = metadataFactory.addProcedureParameter("binding",
TypeFacility.RUNTIME_NAMES.STRING, Type.In, p); //$NON-NLS-1$
param.setAnnotation("The invocation binding (HTTP, SOAP11, SOAP12). May be set or
allowed to default to null to use the default binding."); //$NON-NLS-1$
@@ -142,10 +144,15 @@
param.setAnnotation("The relative or abolute endpoint to use. May be set or
allowed to default to null to use the default endpoint address."); //$NON-NLS-1$
param.setNullType(NullType.Nullable);
- metadataFactory.addProcedureParameter("result",
TypeFacility.RUNTIME_NAMES.XML, Type.ReturnValue, p); //$NON-NLS-1$
+ param = metadataFactory.addProcedureParameter("stream",
TypeFacility.RUNTIME_NAMES.BOOLEAN, Type.In, p); //$NON-NLS-1$
+ param.setAnnotation("If the result should be streamed."); //$NON-NLS-1$
+ param.setNullType(NullType.Nullable);
+ param.setDefaultValue("false"); //$NON-NLS-1$
p = metadataFactory.addProcedure(INVOKE_HTTP);
p.setAnnotation("Invokes a webservice that returns an binary result");
//$NON-NLS-1$
+
+ metadataFactory.addProcedureParameter("result",
TypeFacility.RUNTIME_NAMES.BLOB, Type.ReturnValue, p); //$NON-NLS-1$
param = metadataFactory.addProcedureParameter("action",
TypeFacility.RUNTIME_NAMES.STRING, Type.In, p); //$NON-NLS-1$
param.setAnnotation("Sets the HTTP Method (GET, POST - default, etc.).");
//$NON-NLS-1$
@@ -160,7 +167,11 @@
param.setAnnotation("The relative or abolute endpoint to use. May be set or
allowed to default to null to use the default endpoint address."); //$NON-NLS-1$
param.setNullType(NullType.Nullable);
- metadataFactory.addProcedureParameter("result",
TypeFacility.RUNTIME_NAMES.BLOB, Type.ReturnValue, p); //$NON-NLS-1$
+ param = metadataFactory.addProcedureParameter("stream",
TypeFacility.RUNTIME_NAMES.BOOLEAN, Type.In, p); //$NON-NLS-1$
+ param.setAnnotation("If the result should be streamed."); //$NON-NLS-1$
+ param.setNullType(NullType.Nullable);
+ param.setDefaultValue("false"); //$NON-NLS-1$
+
metadataFactory.addProcedureParameter("contentType",
TypeFacility.RUNTIME_NAMES.STRING, Type.Out, p); //$NON-NLS-1$
}
Modified:
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java
===================================================================
---
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java 2012-08-16
17:50:14 UTC (rev 4336)
+++
trunk/connectors/translator-ws/src/main/java/org/teiid/translator/ws/WSProcedureExecution.java 2012-08-17
17:52:57 UTC (rev 4337)
@@ -29,17 +29,19 @@
import java.util.Arrays;
import java.util.List;
-import javax.xml.transform.Source;
+import javax.xml.stream.XMLStreamException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.stax.StAXSource;
import javax.xml.transform.stream.StreamResult;
-import javax.xml.transform.stream.StreamSource;
import javax.xml.ws.Dispatch;
import javax.xml.ws.WebServiceException;
import javax.xml.ws.handler.MessageContext;
+import org.teiid.core.types.SQLXMLImpl;
import org.teiid.core.types.XMLType;
+import org.teiid.core.types.XMLType.Type;
import org.teiid.language.Argument;
import org.teiid.language.Call;
import org.teiid.metadata.RuntimeMetadata;
@@ -50,16 +52,17 @@
import org.teiid.translator.WSConnection;
import org.teiid.translator.WSConnection.Util;
import org.teiid.translator.ws.WSExecutionFactory.Binding;
+import org.teiid.util.StAXSQLXML;
/**
* A soap call executor - handles all styles doc/literal, rpc/encoded etc.
*/
public class WSProcedureExecution implements ProcedureExecution {
-
+
RuntimeMetadata metadata;
ExecutionContext context;
private Call procedure;
- private Source returnValue;
+ private StAXSource returnValue;
private WSConnection conn;
private WSExecutionFactory executionFactory;
@@ -80,7 +83,7 @@
String style = (String)arguments.get(0).getArgumentValue().getValue();
String action = (String)arguments.get(1).getArgumentValue().getValue();
XMLType docObject = (XMLType)arguments.get(2).getArgumentValue().getValue();
- StreamSource source = null;
+ StAXSource source = null;
try {
source = convertToSource(docObject);
String endpoint = (String)arguments.get(3).getArgumentValue().getValue();
@@ -96,7 +99,7 @@
}
}
- Dispatch<StreamSource> dispatch = conn.createDispatch(style, endpoint,
StreamSource.class, executionFactory.getDefaultServiceMode());
+ Dispatch<StAXSource> dispatch = conn.createDispatch(style, endpoint,
StAXSource.class, executionFactory.getDefaultServiceMode());
if (Binding.HTTP.getBindingId().equals(style)) {
if (action == null) {
@@ -127,23 +130,25 @@
if (source == null) {
// JBoss Native DispatchImpl throws exception when the source is null
- source = new StreamSource(new StringReader("<none/>"));
//$NON-NLS-1$
+ source = new StAXSource(XMLType.getXmlInputFactory().createXMLEventReader(new
StringReader("<none/>"))); //$NON-NLS-1$
}
this.returnValue = dispatch.invoke(source);
} catch (SQLException e) {
throw new TranslatorException(e);
} catch (WebServiceException e) {
throw new TranslatorException(e);
+ } catch (XMLStreamException e) {
+ throw new TranslatorException(e);
} finally {
Util.closeSource(source);
}
}
- private StreamSource convertToSource(SQLXML xml) throws SQLException {
+ private StAXSource convertToSource(SQLXML xml) throws SQLException {
if (xml == null) {
return null;
}
- return xml.getSource(StreamSource.class);
+ return xml.getSource(StAXSource.class);
}
@Override
@@ -153,7 +158,14 @@
@Override
public List<?> getOutputParameterValues() throws TranslatorException {
- return Arrays.asList(returnValue);
+ Object result = returnValue;
+ if (returnValue != null &&
Boolean.TRUE.equals(procedure.getArguments().get(4).getArgumentValue().getValue())) {
+ SQLXMLImpl sqlXml = new StAXSQLXML(returnValue);
+ XMLType xml = new XMLType(sqlXml);
+ xml.setType(Type.DOCUMENT);
+ result = xml;
+ }
+ return Arrays.asList(result);
}
public void close() {
Modified:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-16
17:50:14 UTC (rev 4336)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-17
17:52:57 UTC (rev 4337)
@@ -34,11 +34,14 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.activation.DataSource;
+import javax.xml.stream.XMLStreamException;
import javax.xml.transform.Source;
+import javax.xml.transform.stax.StAXSource;
import javax.xml.transform.stream.StreamSource;
import org.teiid.client.SourceWarning;
import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.FileStoreInputStreamFactory;
import org.teiid.common.buffer.TupleSource;
@@ -74,6 +77,7 @@
import org.teiid.translator.DataNotAvailableException;
import org.teiid.translator.TranslatorException;
import org.teiid.translator.CacheDirective.Scope;
+import org.teiid.util.XMLInputStream;
/**
@@ -86,7 +90,7 @@
*/
public class DataTierTupleSource implements TupleSource,
CompletionListener<AtomicResultsMessage> {
- private static final class MoreWorkTask implements Runnable {
+ private static final class MoreWorkTask implements Runnable {
WeakReference<RequestWorkItem> ref;
@@ -183,7 +187,7 @@
continue;
}
if (convertToRuntimeType[i]) {
- Object result = convertToRuntimeType(value, this.schema[i]);
+ Object result = convertToRuntimeType(dtm.getBufferManager(), value, this.schema[i]);
if (value == result &&
!DataTypeManager.DefaultDataClasses.OBJECT.equals(this.schema[i])) {
convertToRuntimeType[i] = false;
} else {
@@ -216,12 +220,12 @@
return row;
}
- private Object convertToRuntimeType(Object value, Class<?> desiredType) throws
TransformationException {
+ static Object convertToRuntimeType(BufferManager bm, Object value, Class<?>
desiredType) throws TransformationException {
if (value instanceof DataSource && (!(value instanceof Source) || desiredType
!= DataTypeManager.DefaultDataClasses.XML)) {
if (value instanceof InputStreamFactory) {
return new BlobType(new BlobImpl((InputStreamFactory)value));
}
- FileStore fs = dtm.getBufferManager().createFileStore("bytes");
//$NON-NLS-1$
+ FileStore fs = bm.createFileStore("bytes"); //$NON-NLS-1$
//TODO: guess at the encoding from the content type
FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs,
Streamable.ENCODING);
@@ -241,20 +245,31 @@
if (is == null && r != null) {
is = new ReaderInputStream(r, Streamable.CHARSET);
}
- final FileStore fs = dtm.getBufferManager().createFileStore("xml");
//$NON-NLS-1$
+ final FileStore fs = bm.createFileStore("xml"); //$NON-NLS-1$
final FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs,
Streamable.ENCODING);
value = new SaveOnReadInputStream(is, fsisf).getInputStreamFactory();
+ } else if (value instanceof StAXSource) {
+ //TODO: do this lazily. if the first access to get the STaXSource, then
+ //it's more efficient to let the processing happen against STaX
+ StAXSource ss = (StAXSource)value;
+ try {
+ final FileStore fs = bm.createFileStore("xml"); //$NON-NLS-1$
+ final FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs,
Streamable.ENCODING);
+ value = new SaveOnReadInputStream(new XMLInputStream(ss,
XMLSystemFunctions.getOutputFactory()), fsisf).getInputStreamFactory();
+ } catch (XMLStreamException e) {
+ throw new TransformationException(e);
+ }
} else {
//maybe dom or some other source we want to get out of memory
StandardXMLTranslator sxt = new StandardXMLTranslator((Source)value);
SQLXMLImpl sqlxml;
try {
- sqlxml = XMLSystemFunctions.saveToBufferManager(dtm.getBufferManager(), sxt);
+ sqlxml = XMLSystemFunctions.saveToBufferManager(bm, sxt);
} catch (TeiidComponentException e) {
- throw new TeiidRuntimeException(e);
+ throw new TransformationException(e);
} catch (TeiidProcessingException e) {
- throw new TeiidRuntimeException(e);
+ throw new TransformationException(e);
}
return new XMLType(sqlxml);
}
Modified:
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java 2012-08-16
17:50:14 UTC (rev 4336)
+++
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java 2012-08-17
17:52:57 UTC (rev 4337)
@@ -93,12 +93,12 @@
import org.teiid.core.types.XMLTranslator;
import org.teiid.core.types.XMLType;
import org.teiid.core.types.XMLType.Type;
-import org.teiid.jdbc.TeiidSQLException;
import org.teiid.query.QueryPlugin;
import org.teiid.query.eval.Evaluator;
import org.teiid.query.function.CharsetUtils;
import org.teiid.query.util.CommandContext;
import org.teiid.translator.WSConnection.Util;
+import org.teiid.util.StAXSQLXML;
/**
@@ -855,47 +855,41 @@
JSONParser parser = new JSONParser();
final JsonToXmlContentHandler reader = new JsonToXmlContentHandler(rootName, r,
parser, threadLocalEventtFactory.get());
+ SQLXMLImpl sqlXml = null;
if (stream) {
- //jre 1.7 event logic does not set a dummy location and throws an NPE in StAXSource,
so we explicitly set a location
- reader.eventFactory.setLocation(dummyLocation);
- return new SQLXMLImpl() {
- @SuppressWarnings("unchecked")
- public <T extends Source> T getSource(Class<T> sourceClass) throws
SQLException {
- if (sourceClass == null || sourceClass == StAXSource.class) {
- StAXSource source;
- try {
- source = new StAXSource(reader);
- } catch (XMLStreamException e) {
- throw TeiidSQLException.create(e);
- }
- return (T) source;
- }
- throw new AssertionError("unsupported source type"); //$NON-NLS-1$
+ try {
+ //jre 1.7 event logic does not set a dummy location and throws an NPE in StAXSource,
so we explicitly set a location
+ //the streaming result will be directly consumed, so there's no danger that
we're stepping on another location
+ reader.eventFactory.setLocation(dummyLocation);
+ sqlXml = new StAXSQLXML(new StAXSource(reader));
+ } catch (XMLStreamException e) {
+ throw new TeiidProcessingException(e);
+ }
+ } else {
+ sqlXml = XMLSystemFunctions.saveToBufferManager(context.getBufferManager(), new
XMLTranslator() {
+
+ @Override
+ public void translate(Writer writer) throws TransformerException,
+ IOException {
+ try {
+ XMLOutputFactory factory = getOutputFactory();
+ final XMLEventWriter streamWriter = factory.createXMLEventWriter(writer);
+
+ streamWriter.add(reader);
+ streamWriter.flush(); //woodstox needs a flush rather than a close
+ } catch (XMLStreamException e) {
+ throw new TransformerException(e);
+ } finally {
+ try {
+ r.close();
+ } catch (IOException e) {
+
+ }
+ }
}
- };
+ });
}
- XMLType result = new
XMLType(XMLSystemFunctions.saveToBufferManager(context.getBufferManager(), new
XMLTranslator() {
-
- @Override
- public void translate(Writer writer) throws TransformerException,
- IOException {
- try {
- XMLOutputFactory factory = getOutputFactory();
- final XMLEventWriter streamWriter = factory.createXMLEventWriter(writer);
-
- streamWriter.add(reader);
- streamWriter.flush(); //woodstox needs a flush rather than a close
- } catch (XMLStreamException e) {
- throw new TransformerException(e);
- } finally {
- try {
- r.close();
- } catch (IOException e) {
-
- }
- }
- }
- }));
+ XMLType result = new XMLType(sqlXml);
result.setType(Type.DOCUMENT);
return result;
}
Modified:
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-08-16
17:50:14 UTC (rev 4336)
+++
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDataTierManager.java 2012-08-17
17:52:57 UTC (rev 4337)
@@ -24,8 +24,12 @@
import static org.junit.Assert.*;
+import java.io.StringReader;
import java.util.List;
+import javax.xml.transform.stax.StAXSource;
+import javax.xml.transform.stream.StreamSource;
+
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -35,12 +39,19 @@
import org.teiid.client.RequestMessage;
import org.teiid.client.SourceWarning;
import org.teiid.common.buffer.BlockedException;
+import org.teiid.common.buffer.BufferManager;
+import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.TupleSource;
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.types.BlobType;
import org.teiid.core.types.ClobType;
+import org.teiid.core.types.DataTypeManager;
import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.Streamable;
+import org.teiid.core.types.XMLType;
import org.teiid.core.types.InputStreamFactory.StorageMode;
+import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.FakeTransactionService;
import org.teiid.dqp.message.AtomicRequestMessage;
@@ -87,14 +98,13 @@
private DataTierTupleSource helpSetup(String sql, int nodeId) throws Exception {
helpSetupDataTierManager();
- AtomicRequestMessage request = helpSetupRequest(sql, nodeId);
+ AtomicRequestMessage request = helpSetupRequest(sql, nodeId,
RealMetadataFactory.exampleBQTCached());
request.setSerial(serial);
return new DataTierTupleSource(request, workItem,
connectorManager.registerRequest(request), dtm, limit);
}
- private AtomicRequestMessage helpSetupRequest(String sql, int nodeId) throws Exception
{
- QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
- DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata,
vdb);
+ private AtomicRequestMessage helpSetupRequest(String sql, int nodeId,
QueryMetadataInterface metadata) throws Exception {
+ DQPWorkContext workContext = RealMetadataFactory.buildWorkContext(metadata, vdb);
Command command = helpGetCommand(sql, metadata);
@@ -293,10 +303,12 @@
@Test public void testCaching() throws Exception {
assertEquals(0, connectorManager.getExecuteCount().get());
+ QueryMetadataInterface metadata = RealMetadataFactory.exampleBQTCached();
+
CacheDirective cd = new CacheDirective();
this.connectorManager.cacheDirective = cd;
helpSetupDataTierManager();
- Command command = helpSetupRequest("SELECT stringkey from bqt1.smalla",
1).getCommand();
+ Command command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1,
metadata).getCommand();
RegisterRequestParameter rrp = new RegisterRequestParameter();
rrp.connectorBindingId = "x";
TupleSource ts = dtm.registerRequest(context, command, "foo", rrp);
@@ -311,7 +323,7 @@
assertEquals(1, this.rm.getRsCache().getTotalCacheEntries());
//same session, should be cached
- command = helpSetupRequest("SELECT stringkey from bqt1.smalla",
1).getCommand();
+ command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1,
metadata).getCommand();
rrp = new RegisterRequestParameter();
rrp.connectorBindingId = "x";
ts = dtm.registerRequest(context, command, "foo", rrp);
@@ -321,7 +333,7 @@
assertTrue(rrp.doNotCache);
//switch sessions
- command = helpSetupRequest("SELECT stringkey from bqt1.smalla",
1).getCommand();
+ command = helpSetupRequest("SELECT stringkey from bqt1.smalla", 1,
metadata).getCommand();
this.context.getSession().setSessionId("different");
rrp = new RegisterRequestParameter();
rrp.connectorBindingId = "x";
@@ -338,4 +350,24 @@
assertEquals(2, this.rm.getRsCache().getTotalCacheEntries());
}
+ @Test public void testTypeConversion() throws Exception {
+ BufferManager bm = BufferManagerFactory.getStandaloneBufferManager();
+
+ String str = "hello world";
+
+ Object source = new StreamSource(new StringReader(str));
+ XMLType xml = (XMLType) DataTierTupleSource.convertToRuntimeType(bm, source,
DataTypeManager.DefaultDataClasses.XML);
+ assertEquals(str, xml.getString());
+
+ source = new StAXSource(XMLType.getXmlInputFactory().createXMLEventReader(new
StringReader("<a/>")));
+ xml = (XMLType) DataTierTupleSource.convertToRuntimeType(bm, source,
DataTypeManager.DefaultDataClasses.XML);
+ assertEquals("<?xml version=\"1.0\"?><a></a>",
xml.getString());
+
+ byte[] bytes = str.getBytes(Streamable.ENCODING);
+ source = new InputStreamFactory.BlobInputStreamFactory(BlobType.createBlob(bytes));
+ BlobType blob = (BlobType) DataTierTupleSource.convertToRuntimeType(bm, source,
DataTypeManager.DefaultDataClasses.BLOB);
+
+ assertArrayEquals(bytes,
ObjectConverterUtil.convertToByteArray(blob.getBinaryStream()));
+ }
+
}