[teiid-commits] teiid SVN: r3090 - in trunk/engine/src/main/java/org/teiid: query/function/aggregate and 1 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Thu Apr 14 12:19:52 EDT 2011


Author: shawkins
Date: 2011-04-14 12:19:52 -0400 (Thu, 14 Apr 2011)
New Revision: 3090

Modified:
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
   trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java
   trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java
   trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
Log:
TEIID-1556 better handling for xmlagg and reducing the memory held by associated buffers

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-04-13 20:34:39 UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java	2011-04-14 16:19:52 UTC (rev 3090)
@@ -27,6 +27,7 @@
 import java.io.OutputStream;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.Set;
@@ -46,6 +47,7 @@
 		private byte[] buffer;
 		private int count;
 		private boolean bytesWritten;
+		private boolean closed;
 		
 		public FileStoreOutputStream(int size) {
 			this.buffer = new byte[size];
@@ -58,6 +60,7 @@
 
 		@Override
 		public void write(byte[] b, int off, int len) throws IOException {
+			checkOpen();
 			if (len > buffer.length) {
 				flushBuffer();
 				writeDirect(b, off, len);
@@ -86,12 +89,18 @@
 		}
 
 		public void flushBuffer() throws IOException {
+			checkOpen();
 			if (count > 0) {
 				writeDirect(buffer, 0, count);
 				count = 0;
 			}
 		}
 		
+		/**
+		 * Return the buffer.  Can be null if closed and the underlying filestore
+		 * has been writen to.
+		 * @return
+		 */
 		public byte[] getBuffer() {
 			return buffer;
 		}
@@ -114,8 +123,21 @@
 		@Override
 		public void close() throws IOException {
 			flush();
+			closed = true;
+			if (bytesWritten) {
+				this.buffer = null;
+			} else {
+				//truncate
+				this.buffer = Arrays.copyOf(this.buffer, this.count);
+			}
 		}
 		
+		private void checkOpen() {
+			if (closed) {
+				throw new IllegalStateException("Alread closed"); //$NON-NLS-1$
+			}
+		}
+		
 	}
 
 	static class CleanupReference extends PhantomReference<Object> {

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2011-04-13 20:34:39 UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2011-04-14 16:19:52 UTC (rev 3090)
@@ -59,10 +59,20 @@
 		return lobBuffer.getLength();
 	}
 
+	/**
+	 * Returns a new writer instance that is backed by the shared output stream.
+	 * Closing a writer will prevent further writes.
+	 * @return
+	 */
 	public Writer getWriter() {
 		return new OutputStreamWriter(getOuputStream(), Charset.forName(encoding));
 	}
 	
+	/**
+	 * The returned output stream is shared among all uses.
+	 * Once closed no further writing can occur
+	 * @return
+	 */
 	public FileStoreOutputStream getOuputStream() {
 		if (fsos == null) {
 			fsos = lobBuffer.createOutputStream(DataTypeManager.MAX_LOB_MEMORY_BYTES);
@@ -77,6 +87,6 @@
 	
 	@Override
 	public boolean isPersistent() {
-		return true;
+		return fsos == null || fsos.bytesWritten();
 	}
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java	2011-04-13 20:34:39 UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java	2011-04-14 16:19:52 UTC (rev 3090)
@@ -72,7 +72,7 @@
 					}
 				}, textLine.getDelimiter(), textLine.getQuote()));
 			}
-			w.close();
+			w.flush();
 			return fisf;
 		} catch (IOException e) {
 			throw new TeiidProcessingException(e);
@@ -96,7 +96,7 @@
     		String in = (String)input;
     		Writer w = result.getWriter();
     		w.write(in);
-			w.close();
+			w.flush();
 		} catch (IOException e) {
 			throw new TeiidProcessingException(e);
 		}

Modified: trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java	2011-04-13 20:34:39 UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java	2011-04-14 16:19:52 UTC (rev 3090)
@@ -27,7 +27,7 @@
 import org.teiid.core.TeiidComponentException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.XMLType;
-import org.teiid.query.function.source.XMLSystemFunctions;
+import org.teiid.query.function.source.XMLSystemFunctions.XmlConcat;
 import org.teiid.query.util.CommandContext;
 
 /**
@@ -35,7 +35,8 @@
  */
 public class XMLAgg extends AggregateFunction {
 
-    private XMLType result;
+	private XMLType result;
+	private XmlConcat concat;
     private CommandContext context;
     
     public XMLAgg(CommandContext context) {
@@ -43,7 +44,8 @@
 	}
 
     public void reset() {
-        result = null;
+    	concat = null;
+    	result = null;
     }
 
     /**
@@ -52,13 +54,25 @@
      * @see org.teiid.query.function.aggregate.AggregateFunction#addInputDirect(Object, List)
      */
     public void addInputDirect(Object input, List<?> tuple) throws TeiidComponentException, TeiidProcessingException {
-    	result = XMLSystemFunctions.xmlConcat(context, result, input);
+    	if (concat == null) {
+    		concat = new XmlConcat(context.getBufferManager());
+    	}
+    	concat.addValue(input);
     }
 
     /**
+     * @throws TeiidProcessingException 
+     * @throws TeiidComponentException 
      * @see org.teiid.query.function.aggregate.AggregateFunction#getResult()
      */
-    public Object getResult() {
+    public Object getResult() throws TeiidComponentException, TeiidProcessingException {
+    	if (result == null) {
+    		if (concat == null) {
+        		return null;
+    		}
+    		result = concat.close();
+    		concat = null;
+    	}
         return result;
     }
 

Modified: trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java	2011-04-13 20:34:39 UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java	2011-04-14 16:19:52 UTC (rev 3090)
@@ -386,27 +386,67 @@
 			return singleValue;
 		}
 		
-		XMLType result = new XMLType(XMLSystemFunctions.saveToBufferManager(context.getBufferManager(), new XMLTranslator() {
-			
-			@Override
-			public void translate(Writer writer) throws TransformerException,
-					IOException {
-				try {
-					XMLOutputFactory factory = getOutputFactory();
-					XMLEventWriter eventWriter = factory.createXMLEventWriter(writer);
-					XMLEventFactory eventFactory = XMLEventFactory.newInstance();
-					convertValue(writer, eventWriter, eventFactory, xml);
-					for (Object object : other) {
-						convertValue(writer, eventWriter, eventFactory, object);
-					}
-					eventWriter.flush(); //woodstox needs a flush rather than a close
-				} catch (XMLStreamException e) {
-					throw new TransformerException(e);
-				} 
+		XmlConcat concat = new XmlConcat(context.getBufferManager());
+		concat.addValue(xml);
+		for (Object object : other) {
+			concat.addValue(object);
+		}
+		return concat.close();
+	}
+	
+	public static class XmlConcat {
+		private XMLOutputFactory factory;
+		private XMLEventWriter eventWriter;
+		private XMLEventFactory eventFactory;
+		private Writer writer;
+		private FileStoreInputStreamFactory fsisf;
+		private FileStore fs;
+		
+		public XmlConcat(BufferManager bm) throws TeiidProcessingException {
+			fs = bm.createFileStore("xml"); //$NON-NLS-1$
+			fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+		    writer = fsisf.getWriter();
+			factory = getOutputFactory();
+			try {
+				eventWriter = factory.createXMLEventWriter(writer);
+			} catch (XMLStreamException e) {
+				fs.remove();
+				throw new TeiidProcessingException(e);
 			}
-		}));
-		result.setType(Type.CONTENT);
-		return result;
+			eventFactory = XMLEventFactory.newInstance();
+		}
+		
+		public void addValue(Object object) throws TeiidProcessingException {
+			try {
+				convertValue(writer, eventWriter, eventFactory, object);
+			} catch (IOException e) {
+				fs.remove();
+				throw new TeiidProcessingException(e);
+			} catch (XMLStreamException e) {
+				fs.remove();
+				throw new TeiidProcessingException(e);
+			} catch (TransformerException e) {
+				fs.remove();
+				throw new TeiidProcessingException(e);
+			}
+		}
+		
+		public XMLType close() throws TeiidProcessingException {
+			try {
+				eventWriter.flush();
+				writer.close();
+			} catch (XMLStreamException e) {
+				fs.remove();
+				throw new TeiidProcessingException(e);
+			} catch (IOException e) {
+				fs.remove();
+				throw new TeiidProcessingException(e);
+			}
+	        XMLType result = new XMLType(new SQLXMLImpl(fsisf));
+	        result.setType(Type.CONTENT);
+	        return result;
+		}
+		
 	}
 	
 	private static XMLOutputFactory getOutputFactory() throws FactoryConfigurationError {



More information about the teiid-commits mailing list