[teiid-commits] teiid SVN: r4328 - in trunk: engine/src/main/java/org/teiid/common/buffer and 4 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Aug 16 09:32:06 EDT 2012


Author: shawkins
Date: 2012-08-16 09:32:05 -0400 (Thu, 16 Aug 2012)
New Revision: 4328

Added:
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java
Modified:
   trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
   trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
   trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
Log:
TEIID-2148 adding a saveonreadinputstream

Modified: trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java	2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/common-core/src/main/java/org/teiid/core/types/StandardXMLTranslator.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -32,6 +32,16 @@
 import javax.xml.transform.stream.StreamResult;
 
 public class StandardXMLTranslator extends XMLTranslator {
+	
+	private static ThreadLocal<TransformerFactory> threadLocalTransformerFactory = new ThreadLocal<TransformerFactory>() {
+		protected TransformerFactory initialValue() {
+			return TransformerFactory.newInstance();
+		}
+	};
+	
+	public static TransformerFactory getThreadLocalTransformerFactory() {
+		return threadLocalTransformerFactory.get();
+	}
     
     private Source source;
         
@@ -41,7 +51,7 @@
     
     @Override
     public void translate(Writer writer) throws TransformerException, IOException {
-        Transformer t = TransformerFactory.newInstance().newTransformer();
+        Transformer t = threadLocalTransformerFactory.get().newTransformer();
         t.transform(source, new StreamResult(writer));
     }
         

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -42,6 +42,7 @@
 		private int count;
 		private boolean bytesWritten;
 		private boolean closed;
+		private byte[] singleByte = new byte[1];
 		
 		public FileStoreOutputStream(int size) {
 			this.buffer = new byte[size];
@@ -49,7 +50,8 @@
 		
 		@Override
 		public void write(int b) throws IOException {
-			write(new byte[b], 0, 1);
+			singleByte[0] = (byte)b;
+			write(singleByte, 0, 1);
 		}
 
 		@Override

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -47,11 +47,19 @@
 	}
 
 	@Override
-	public InputStream getInputStream() throws IOException {
+	public InputStream getInputStream() {
+		return getInputStream(0);
+	}
+	
+	public InputStream getInputStream(long start) {
 		if (fsos != null && !fsos.bytesWritten()) {
-			return new ByteArrayInputStream(fsos.getBuffer(), 0, fsos.getCount());
+			if (start > Integer.MAX_VALUE) {
+				throw new AssertionError("Invalid start " + start); //$NON-NLS-1$
+			}
+			int s = (int)start;
+			return new ByteArrayInputStream(fsos.getBuffer(), s, fsos.getCount() - s);
 		}
-		return lobBuffer.createInputStream(0);
+		return lobBuffer.createInputStream(start);
 	}
 	
 	@Override
@@ -89,7 +97,7 @@
 	}
 
 	@Override
-	public void free() throws IOException {
+	public void free() {
 		lobBuffer.remove();
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/DataTierTupleSource.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -23,6 +23,8 @@
 package org.teiid.dqp.internal.process;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
 import java.lang.ref.WeakReference;
 import java.util.Arrays;
 import java.util.List;
@@ -33,6 +35,7 @@
 
 import javax.activation.DataSource;
 import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
 
 import org.teiid.client.SourceWarning;
 import org.teiid.common.buffer.BlockedException;
@@ -52,7 +55,7 @@
 import org.teiid.core.types.TransformationException;
 import org.teiid.core.types.XMLType;
 import org.teiid.core.util.Assertion;
-import org.teiid.core.util.ObjectConverterUtil;
+import org.teiid.core.util.ReaderInputStream;
 import org.teiid.dqp.internal.datamgr.ConnectorWork;
 import org.teiid.dqp.internal.process.DQPCore.CompletionListener;
 import org.teiid.dqp.internal.process.DQPCore.FutureWork;
@@ -221,28 +224,42 @@
 			FileStore fs = dtm.getBufferManager().createFileStore("bytes"); //$NON-NLS-1$
 			//TODO: guess at the encoding from the content type
 			FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
-			
+
 			try {
-				ObjectConverterUtil.write(fsisf.getOuputStream(), ((DataSource)value).getInputStream(), -1);
+				SaveOnReadInputStream is = new SaveOnReadInputStream(((DataSource)value).getInputStream(), fsisf);
+				return new BlobType(new BlobImpl(is.getInputStreamFactory()));
 			} catch (IOException e) {
-				 throw new TransformationException(QueryPlugin.Event.TEIID30500, e, e.getMessage());
+				throw new TransformationException(QueryPlugin.Event.TEIID30500, e, e.getMessage());
 			}
-			return new BlobType(new BlobImpl(fsisf));
 		}
 		if (value instanceof Source) {
-			if (value instanceof InputStreamFactory) {
-				return new XMLType(new SQLXMLImpl((InputStreamFactory)value));
+			if (!(value instanceof InputStreamFactory)) {
+				if (value instanceof StreamSource) {
+					StreamSource ss = (StreamSource)value;
+					InputStream is = ss.getInputStream();
+					Reader r = ss.getReader();
+					if (is == null && r != null) {
+						is = new ReaderInputStream(r, Streamable.CHARSET);
+					}
+					final FileStore fs = dtm.getBufferManager().createFileStore("xml"); //$NON-NLS-1$
+					final FileStoreInputStreamFactory fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+
+					value = new SaveOnReadInputStream(is, fsisf).getInputStreamFactory();
+				} else {
+					//maybe dom or some other source we want to get out of memory
+					StandardXMLTranslator sxt = new StandardXMLTranslator((Source)value);
+					SQLXMLImpl sqlxml;
+					try {
+						sqlxml = XMLSystemFunctions.saveToBufferManager(dtm.getBufferManager(), sxt);
+					} catch (TeiidComponentException e) {
+						 throw new TeiidRuntimeException(e);
+					} catch (TeiidProcessingException e) {
+						 throw new TeiidRuntimeException(e);
+					}
+					return new XMLType(sqlxml);	
+				}
 			}
-			StandardXMLTranslator sxt = new StandardXMLTranslator((Source)value);
-			SQLXMLImpl sqlxml;
-			try {
-				sqlxml = XMLSystemFunctions.saveToBufferManager(dtm.getBufferManager(), sxt);
-			} catch (TeiidComponentException e) {
-				 throw new TeiidRuntimeException(e);
-			} catch (TeiidProcessingException e) {
-				 throw new TeiidRuntimeException(e);
-			}
-			return new XMLType(sqlxml);
+			return new XMLType(new SQLXMLImpl((InputStreamFactory)value));
 		}
 		return DataTypeManager.convertToRuntimeType(value);
 	}

Added: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.teiid.common.buffer.FileStoreInputStreamFactory;
+import org.teiid.common.buffer.FileStore.FileStoreOutputStream;
+import org.teiid.core.types.InputStreamFactory;
+
+/**
+ * An {@link InputStream} wrapper that saves the input on read and provides a {@link InputStreamFactory}.
+ */
+final class SaveOnReadInputStream extends FilterInputStream {
+	
+	class SwitchingInputStream extends FilterInputStream {
+
+		protected SwitchingInputStream() {
+			super(SaveOnReadInputStream.this);
+		}
+		
+		public void setIn(InputStream is) {
+			this.in = is;
+		}
+		
+	}
+	
+	private SwitchingInputStream sis = new SwitchingInputStream();
+	private final FileStoreInputStreamFactory fsisf;
+	private FileStoreOutputStream fsos;
+	private boolean saved;
+	private boolean read;
+	private boolean returned;
+
+	InputStreamFactory inputStreamFactory = new InputStreamFactory() {
+		
+		@Override
+		public InputStream getInputStream() throws IOException {
+			if (!saved) {
+				if (!returned) {
+					returned = true;
+					return sis;
+				}
+				//save the rest of the stream
+				SaveOnReadInputStream.this.fsos.flush();
+				long start = SaveOnReadInputStream.this.fsisf.getLength();
+				SaveOnReadInputStream.this.close(); //force the pending read
+				InputStream is = SaveOnReadInputStream.this.fsisf.getInputStream(start);
+				sis.setIn(is);
+			}
+			return fsisf.getInputStream();
+		}
+		
+		@Override
+		public StorageMode getStorageMode() {
+			if (!saved) {
+				try {
+					getInputStream().close();
+				} catch (IOException e) {
+					return StorageMode.OTHER;
+				}
+			}
+			return fsisf.getStorageMode();
+		}
+	};
+
+	SaveOnReadInputStream(InputStream in,
+			FileStoreInputStreamFactory fsisf) {
+		super(in);
+		this.fsisf = fsisf;
+		fsos = fsisf.getOuputStream();
+	}
+
+	@Override
+	public int read() throws IOException {
+		read = true;
+		int i = super.read();
+		read = false;
+		if (i > 0) {
+			fsos.write(i);
+		} else {
+			saved = true;
+		}
+		return i;
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len)
+			throws IOException {
+		read = true;
+		int bytes = super.read(b, off, len);
+		read = false;
+		if (bytes > 0) {
+			fsos.write(b, off, bytes);
+		} else if (bytes == -1) {
+			saved = true;
+		}
+		return bytes;
+	}
+
+	@Override
+	public void close() throws IOException {
+		try {
+			if (!saved && !read) {
+				byte[] bytes = new byte[1<<13];
+				while (!saved) {
+					read(bytes, 0, bytes.length);
+				}
+			}
+			fsos.close();
+		} finally {
+			if (!saved) {
+				fsisf.free();
+				saved = true;
+			}
+			super.close();
+		}
+	}
+	
+	InputStreamFactory getInputStreamFactory() {
+		return inputStreamFactory;
+	}
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/dqp/internal/process/SaveOnReadInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java	2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -88,6 +88,7 @@
 import org.teiid.core.types.ClobImpl;
 import org.teiid.core.types.ClobType;
 import org.teiid.core.types.SQLXMLImpl;
+import org.teiid.core.types.StandardXMLTranslator;
 import org.teiid.core.types.Streamable;
 import org.teiid.core.types.XMLTranslator;
 import org.teiid.core.types.XMLType;
@@ -326,11 +327,6 @@
 		}
 	}
 	
-	private static ThreadLocal<TransformerFactory> threadLocalTransformerFactory = new ThreadLocal<TransformerFactory>() {
-		protected TransformerFactory initialValue() {
-			return TransformerFactory.newInstance();
-		}
-	};
 	static ThreadLocal<XMLOutputFactory> threadLocalOutputFactory = new ThreadLocal<XMLOutputFactory>() {
 		protected XMLOutputFactory initialValue() {
 			return newXmlOutputFactory();
@@ -370,7 +366,7 @@
 			styleSource = convertToSource(styleSheet);
 			xmlSource = convertToSource(xml);
 			final Source xmlParam = xmlSource;
-			TransformerFactory factory = threadLocalTransformerFactory.get();
+			TransformerFactory factory = StandardXMLTranslator.getThreadLocalTransformerFactory();
             final Transformer transformer = factory.newTransformer(styleSource);
             
 			//this creates a non-validated sqlxml - it may not be valid xml/root-less xml

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java	2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestRequest.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -22,10 +22,11 @@
 
 package org.teiid.dqp.internal.process;
 
+import static org.junit.Assert.*;
+
 import java.util.ArrayList;
 
-import junit.framework.TestCase;
-
+import org.junit.Test;
 import org.mockito.Mockito;
 import org.teiid.client.RequestMessage;
 import org.teiid.client.RequestMessage.StatementType;
@@ -46,32 +47,19 @@
 import org.teiid.query.tempdata.TempTableStore.TransactionMode;
 import org.teiid.query.unittest.RealMetadataFactory;
 
+public class TestRequest {
 
-
-/** 
- * @since 4.2
- */
-public class TestRequest extends TestCase {
-
     private static final TempTableStore TEMP_TABLE_STORE = new TempTableStore("1", TransactionMode.ISOLATE_WRITES); //$NON-NLS-1$
 	private final static String QUERY = "SELECT * FROM pm1.g1";  //$NON-NLS-1$
     
     /**
-     * Constructor for TestRequest.
-     * @param name
-     */
-    public TestRequest(String name) {
-        super(name);
-    }
-    
-    /**
      * Test Request.validateEntitlement().  
      * Make sure that this can be called both before and after metadata is initialized. 
      * See defect 17209.
      * @throws Exception
      * @since 4.2
      */
-    public void testValidateEntitlement() throws Exception {
+    @Test public void testValidateEntitlement() throws Exception {
         QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
         
         Request request = new Request();
@@ -99,7 +87,7 @@
      * @throws Exception
      * @since 4.2
      */
-    public void testProcessRequest() throws Exception {
+    @Test public void testProcessRequest() throws Exception {
         QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
         
         //Try before plan is cached.
@@ -115,7 +103,7 @@
         helpProcessMessage(message, null, workContext);
     }
     
-    public void testCommandContext() throws Exception {
+    @Test public void testCommandContext() throws Exception {
         QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
         
         //Try before plan is cached.
@@ -153,7 +141,7 @@
      * @throws Exception
      * @since 4.2
      */
-    public void testProcessRequestPreparedStatement() throws Exception {
+    @Test public void testProcessRequestPreparedStatement() throws Exception {
         QueryMetadataInterface metadata = RealMetadataFactory.example1Cached();
         SessionAwareCache<PreparedPlan> cache = new SessionAwareCache<PreparedPlan>();
         

Added: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java	                        (rev 0)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.dqp.internal.process;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Test;
+import org.teiid.common.buffer.BufferManagerFactory;
+import org.teiid.common.buffer.FileStore;
+import org.teiid.common.buffer.FileStoreInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory;
+import org.teiid.core.types.Streamable;
+import org.teiid.core.types.InputStreamFactory.StorageMode;
+import org.teiid.core.util.ObjectConverterUtil;
+
+ at SuppressWarnings("nls")
+public class TestSaveOnReadInputStream {
+	
+	@Test public void testSave() throws IOException {
+		SaveOnReadInputStream soris = getSaveOnReadInputStream();
+		InputStreamFactory isf = soris.getInputStreamFactory();
+		InputStream is = isf.getInputStream();
+		assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is), Streamable.CHARSET));
+		InputStream is2 = isf.getInputStream(); 
+		assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is2), Streamable.CHARSET));
+	}
+
+	@Test public void testPartialReadSave() throws IOException {
+		SaveOnReadInputStream soris = getSaveOnReadInputStream();
+		InputStreamFactory isf = soris.getInputStreamFactory();
+		InputStream is = isf.getInputStream();
+		is.read();
+		
+		InputStream is2 = isf.getInputStream(); 
+		assertEquals("ello world", new String(ObjectConverterUtil.convertToByteArray(is), Streamable.CHARSET));
+		assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is2), Streamable.CHARSET));
+		InputStream is3 = isf.getInputStream(); 
+		assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is3), Streamable.CHARSET));
+	}
+	
+	@Test public void testStorageMode() throws IOException {
+		SaveOnReadInputStream soris = getSaveOnReadInputStream();
+		InputStreamFactory isf = soris.getInputStreamFactory();
+		
+		assertEquals(StorageMode.MEMORY, isf.getStorageMode());
+		
+		InputStream is = isf.getInputStream();
+		assertEquals("hello world", new String(ObjectConverterUtil.convertToByteArray(is), Streamable.CHARSET));
+	}
+
+	private SaveOnReadInputStream getSaveOnReadInputStream() {
+		FileStore fs = BufferManagerFactory.getStandaloneBufferManager().createFileStore("test");
+		FileStoreInputStreamFactory factory = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+		
+		InputStream is = new ByteArrayInputStream("hello world".getBytes(Streamable.CHARSET));
+		
+		SaveOnReadInputStream soris = new SaveOnReadInputStream(is, factory);
+		return soris;
+	}
+
+}


Property changes on: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestSaveOnReadInputStream.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-08-15 18:44:50 UTC (rev 4327)
+++ trunk/runtime/src/main/java/org/teiid/runtime/EmbeddedServer.java	2012-08-16 13:32:05 UTC (rev 4328)
@@ -37,7 +37,6 @@
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.RollbackException;
 import javax.transaction.Synchronization;



More information about the teiid-commits mailing list