teiid SVN: r4328 - in trunk: engine/src/main/java/org/teiid/common/buffer and 4 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2012-08-16 09:32:05 -0400 (Thu, 16 Aug 2012)
New Revision: 4328
Added:
trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java
Modified:
trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
Log:
TEIID-2148 adding a saveonreadinputstream
Modified: trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java 2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -32,6 +32,16 @@
import javax.xml.transform.stream.StreamResult;
public class StandardXMLTranslator extends XMLTranslator {
+
+ private static ThreadLocal<TransformerFactory> threadLocalTransformerFactory = new ThreadLocal<TransformerFactory>() {
+ protected TransformerFactory initialValue() {
+ return TransformerFactory.newInstance();
+ }
+ };
+
+ public static TransformerFactory getThreadLocalTransformerFactory() {
+ return threadLocalTransformerFactory.get();
+ }
private Source source;
@@ -41,7 +51,7 @@
@Override
public void translate(Writer writer) throws TransformerException, IOException {
- Transformer t = TransformerFactory.newInstance().newTransformer();
+ Transformer t = threadLocalTransformerFactory.get().newTransformer();
t.transform(source, new StreamResult(writer));
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -42,6 +42,7 @@
private int count;
private boolean bytesWritten;
private boolean closed;
+ private byte[] singleByte = new byte[1];
public FileStoreOutputStream(int size) {
this.buffer = new byte[size];
@@ -49,7 +50,8 @@
@Override
public void write(int b) throws IOException {
- write(new byte[b], 0, 1);
+ singleByte[0] = (byte)b;
+ write(singleByte, 0, 1);
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -47,11 +47,19 @@
}
@Override
- public InputStream getInputStream() throws IOException {
+ public InputStream getInputStream() {
+ return getInputStream(0);
+ }
+
+ public InputStream getInputStream(long start) {
if (fsos != null && !fsos.bytesWritten()) {
- return new ByteArrayInputStream(fsos.getBuffer(), 0, fsos.getCount());
+ if (start > Integer.MAX_VALUE) {
+ throw new AssertionError("Invalid start " + start); //$NON-NLS-1$
+ }
+ int s = (int)start;
+ return new ByteArrayInputStream(fsos.getBuffer(), s, fsos.getCount() - s);
}
- return lobBuffer.createInputStream(0);
+ return lobBuffer.createInputStream(start);
}
@Override
@@ -89,7 +97,7 @@
}
@Override
- public void free() throws IOException {
+ public void free() {
lobBuffer.remove();
}
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -23,6 +23,8 @@
package org.teiid.dqp.internal.process;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.List;
@@ -33,6 +35,7 @@
import javax.activation.DataSource;
import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
import org.teiid.client.SourceWarning;
import org.teiid.common.buffer.BlockedException;
@@ -52,7 +55,7 @@
import org.teiid.core.types.TransformationException;
import org.teiid.core.types.XMLType;
import org.teiid.core.util.Assertion;
-import org.teiid.core.util.ObjectConverterUtil;
+import org.teiid.core.util.ReaderInputStream;
import org.teiid.dqp.internal.datamgr.ConnectorWork;
import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
import org.teiid.dqp.internal.process.DQPCore.FutureWork;
@@ -221,28 +224,42 @@
FileStore fs = dtm.getBufferManager().createFileStore("bytes"); //$NON-NLS-1$
//TODO: guess at the encoding from the content type
FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
-
+
try {
- ObjectConverterUtil.write(fsisf.getOuputStream(), ((DataSource)value).getInputStream(), -1);
+ SaveOnReadInputStream is = new SaveOnReadInputStream(((DataSource)value).getInputStream(), fsisf);
+ return new BlobType(new BlobImpl(is.getInputStreamFactory()));
} catch (IOException e) {
- throw new TransformationException(QueryPlugin.Event.TEIID30500, e, e.getMessage());
+ throw new TransformationException(QueryPlugin.Event.TEIID30500, e, e.getMessage());
}
- return new BlobType(new BlobImpl(fsisf));
}
if (value instanceof Source) {
- if (value instanceof InputStreamFactory) {
- return new XMLType(new SQLXMLImpl((InputStreamFactory)value));
+ if (!(value instanceof InputStreamFactory)) {
+ if (value instanceof StreamSource) {
+ StreamSource ss = (StreamSource)value;
+ InputStream is = ss.getInputStream();
+ Reader r = ss.getReader();
+ if (is == null && r != null) {
+ is = new ReaderInputStream(r, Streamable.CHARSET);
+ }
+ final FileStore fs = dtm.getBufferManager().createFileStore("xml"); //$NON-NLS-1$
+ final FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+
+ value = new SaveOnReadInputStream(is, fsisf).getInputStreamFactory();
+ } else {
+ //maybe dom or some other source we want to get out of memory
+ StandardXMLTranslator sxt = new StandardXMLTranslator((Source)value);
+ SQLXMLImpl sqlxml;
+ try {
+ sqlxml = XMLSystemFunctions.saveToBufferManager(dtm.getBufferManager(), sxt);
+ } catch (TeiidComponentException e) {
+ throw new TeiidRuntimeException(e);
+ } catch (TeiidProcessingException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ return new XMLType(sqlxml);
+ }
}
- StandardXMLTranslator sxt = new StandardXMLTranslator((Source)value);
- SQLXMLImpl sqlxml;
- try {
- sqlxml = XMLSystemFunctions.saveToBufferManager(dtm.getBufferManager(), sxt);
- } catch (TeiidComponentException e) {
- throw new TeiidRuntimeException(e);
- } catch (TeiidProcessingException e) {
- throw new TeiidRuntimeException(e);
- }
- return new XMLType(sqlxml);
+ return new XMLType(new SQLXMLImpl((InputStreamFactory)value));
}
return DataTypeManager.convertToRuntimeType(value);
}
Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.teiid.common.buffer.FileStoreInputStreamFactory;
+import org.teiid.common.buffer.FileStore.FileStoreOutputStream;
+import org.teiid.core.types.InputStreamFactory;
+
+/**
+ * An {@link InputStream} wrapper that saves the input on read and provides a {@link InputStreamFactory}.
+ */
+final class SaveOnReadInputStream extends FilterInputStream {
+
+ class SwitchingInputStream extends FilterInputStream {
+
+ protected SwitchingInputStream() {
+ super(SaveOnReadInputStream.this);
+ }
+
+ public void setIn(InputStream is) {
+ this.in = is;
+ }
+
+ }
+
+ private SwitchingInputStream sis = new SwitchingInputStream();
+ private final FileStoreInputStreamFactory fsisf;
+ private FileStoreOutputStream fsos;
+ private boolean saved;
+ private boolean read;
+ private boolean returned;
+
+ InputStreamFactory inputStreamFactory = new InputStreamFactory() {
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ if (!saved) {
+ if (!returned) {
+ returned = true;
+ return sis;
+ }
+ //save the rest of the stream
+ SaveOnReadInputStream.this.fsos.flush();
+ long start = SaveOnReadInputStream.this.fsisf.getLength();
+ SaveOnReadInputStream.this.close(); //force the pending read
+ InputStream is = SaveOnReadInputStream.this.fsisf.getInputStream(start);
+ sis.setIn(is);
+ }
+ return fsisf.getInputStream();
+ }
+
+ @Override
+ public StorageMode getStorageMode() {
+ if (!saved) {
+ try {
+ getInputStream().close();
+ } catch (IOException e) {
+ return StorageMode.OTHER;
+ }
+ }
+ return fsisf.getStorageMode();
+ }
+ };
+
+ SaveOnReadInputStream(InputStream in,
+ FileStoreInputStreamFactory fsisf) {
+ super(in);
+ this.fsisf = fsisf;
+ fsos = fsisf.getOuputStream();
+ }
+
+ @Override
+ public int read() throws IOException {
+ read = true;
+ int i = super.read();
+ read = false;
+ if (i > 0) {
+ fsos.write(i);
+ } else {
+ saved = true;
+ }
+ return i;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ throws IOException {
+ read = true;
+ int bytes = super.read(b, off, len);
+ read = false;
+ if (bytes > 0) {
+ fsos.write(b, off, bytes);
+ } else if (bytes == -1) {
+ saved = true;
+ }
+ return bytes;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (!saved && !read) {
+ byte[] bytes = new byte[1<<13];
+ while (!saved) {
+ read(bytes, 0, bytes.length);
+ }
+ }
+ fsos.close();
+ } finally {
+ if (!saved) {
+ fsisf.free();
+ saved = true;
+ }
+ super.close();
+ }
+ }
+
+ InputStreamFactory getInputStreamFactory() {
+ return inputStreamFactory;
+ }
+}
\ No newline at end of file
Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java 2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -88,6 +88,7 @@
import org.teiid.core.types.ClobImpl;
import org.teiid.core.types.ClobType;
import org.teiid.core.types.SQLXMLImpl;
+import org.teiid.core.types.StandardXMLTranslator;
import org.teiid.core.types.Streamable;
import org.teiid.core.types.XMLTranslator;
import org.teiid.core.types.XMLType;
@@ -326,11 +327,6 @@
}
}
- private static ThreadLocal<TransformerFactory> threadLocalTransformerFactory = new ThreadLocal<TransformerFactory>() {
- protected TransformerFactory initialValue() {
- return TransformerFactory.newInstance();
- }
- };
static ThreadLocal<XMLOutputFactory> threadLocalOutputFactory = new ThreadLocal<XMLOutputFactory>() {
protected XMLOutputFactory initialValue() {
return newXmlOutputFactory();
@@ -370,7 +366,7 @@
styleSource = convertToSource(styleSheet);
xmlSource = convertToSource(xml);
final Source xmlParam = xmlSource;
- TransformerFactory factory = threadLocalTransformerFactory.get();
+ TransformerFactory factory = StandardXMLTranslator.getThreadLocalTransformerFactory();
final Transformer transformer = factory.newTransformer(styleSource);
//this creates a non-validated sqlxml - it may not be valid xml/root-less xml
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -22,10 +22,11 @@
package org.teiid.dqp.internal.process;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
-import junit.framework.TestCase;
-
+import org.junit.Test;
import org.mockito.Mockito;
import org.teiid.client.RequestMessage;
import org.teiid.client.RequestMessage.StatementType;
@@ -46,32 +47,19 @@
import org.teiid.query.tempdata.TempTableStore.TransactionMode;
import org.teiid.query.unittest.RealMetadataFactory;
+public class TestRequest {
-
-/**
- * @since 4.2
- */
-public class TestRequest extends TestCase {
-
private static final TempTableStore TEMP_TABLE_STORE = new TempTableStore("1", TransactionMode.ISOLATE_WRITES); //$NON-NLS-1$
private final static String QUERY = "SELECT * FROM pm1.g1"; //$NON-NLS-1$
/**
- * Constructor for TestRequest.
- * @param name
- */
- public TestRequest(String name) {
- super(name);
- }
-
- /**
* Test Request.validateEntitlement().
* Make sure that this can be called both before and after metadata is initialized.
* See defect 17209.
* @throws Exception
* @since 4.2
*/
- public void testValidateEntitlement() throws Exception {
+ @Test public void testValidateEntitlement() throws Exception {
QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
Request request = new Request();
@@ -99,7 +87,7 @@
* @throws Exception
* @since 4.2
*/
- public void testProcessRequest() throws Exception {
+ @Test public void testProcessRequest() throws Exception {
QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
//Try before plan is cached.
@@ -115,7 +103,7 @@
helpProcessMessage(message, null, workContext);
}
- public void testCommandContext() throws Exception {
+ @Test public void testCommandContext() throws Exception {
QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
//Try before plan is cached.
@@ -153,7 +141,7 @@
* @throws Exception
* @since 4.2
*/
- public void testProcessRequestPreparedStatement() throws Exception {
+ @Test public void testProcessRequestPreparedStatement() throws Exception {
QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
SessionAwareCache<PreparedPlan> cache = new SessionAwareCache<PreparedPlan>();
Added: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java (rev 0)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Test;
+import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.FileStoreInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.Streamable;
+import org.teiid.core.types.InputStreamFactory.StorageMode;
+import org.teiid.core.util.ObjectConverterUtil;
+
+@SuppressWarnings("nls")
+public class TestSaveOnReadInputStream {
+
+ @Test public void testSave() throws IOException {
+ SaveOnReadInputStream soris = getSaveOnReadInputStream();
+ InputStreamFactory isf = soris.getInputStreamFactory();
+ InputStream is = isf.getInputStream();
+ assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is), Streamable.CHARSET));
+ InputStream is2 = isf.getInputStream();
+ assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is2), Streamable.CHARSET));
+ }
+
+ @Test public void testPartialReadSave() throws IOException {
+ SaveOnReadInputStream soris = getSaveOnReadInputStream();
+ InputStreamFactory isf = soris.getInputStreamFactory();
+ InputStream is = isf.getInputStream();
+ is.read();
+
+ InputStream is2 = isf.getInputStream();
+ assertEquals("ello world", new String(ObjectConverterUtil.convertToByteArray(is), Streamable.CHARSET));
+ assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is2), Streamable.CHARSET));
+ InputStream is3 = isf.getInputStream();
+ assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is3), Streamable.CHARSET));
+ }
+
+ @Test public void testStorageMode() throws IOException {
+ SaveOnReadInputStream soris = getSaveOnReadInputStream();
+ InputStreamFactory isf = soris.getInputStreamFactory();
+
+ assertEquals(StorageMode.MEMORY, isf.getStorageMode());
+
+ InputStream is = isf.getInputStream();
+ assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is), Streamable.CHARSET));
+ }
+
+ private SaveOnReadInputStream getSaveOnReadInputStream() {
+ FileStore fs = BufferManagerFactory.getStandaloneBufferManager().createFileStore("test");
+ FileStoreInputStreamFactory factory = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+
+ InputStream is = new ByteArrayInputStream("hello world".getBytes(Streamable.CHARSET));
+
+ SaveOnReadInputStream soris = new SaveOnReadInputStream(is, factory);
+ return soris;
+ }
+
+}
Property changes on: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java 2012-08-16 13:32:05 UTC (rev 4328)
@@ -37,7 +37,6 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.RollbackException;
import javax.transaction.Synchronization;