Author: shawkins
Date: 2011-04-14 12:19:52 -0400 (Thu, 14 Apr 2011)
New Revision: 3090
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java
trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
Log:
TEIID-1556 better handling for xmlagg and reducing the memory held by associated buffers
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-04-13 20:34:39
UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-04-14 16:19:52
UTC (rev 3090)
@@ -27,6 +27,7 @@
import java.io.OutputStream;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
+import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
@@ -46,6 +47,7 @@
private byte[] buffer;
private int count;
private boolean bytesWritten;
+ private boolean closed;
public FileStoreOutputStream(int size) {
this.buffer = new byte[size];
@@ -58,6 +60,7 @@
@Override
public void write(byte[] b, int off, int len) throws IOException {
+ checkOpen();
if (len > buffer.length) {
flushBuffer();
writeDirect(b, off, len);
@@ -86,12 +89,18 @@
}
public void flushBuffer() throws IOException {
+ checkOpen();
if (count > 0) {
writeDirect(buffer, 0, count);
count = 0;
}
}
+ /**
+ * Return the buffer. Can be null if closed and the underlying filestore
+ * has been writen to.
+ * @return
+ */
public byte[] getBuffer() {
return buffer;
}
@@ -114,8 +123,21 @@
@Override
public void close() throws IOException {
flush();
+ closed = true;
+ if (bytesWritten) {
+ this.buffer = null;
+ } else {
+ //truncate
+ this.buffer = Arrays.copyOf(this.buffer, this.count);
+ }
}
+ private void checkOpen() {
+ if (closed) {
+ throw new IllegalStateException("Alread closed"); //$NON-NLS-1$
+ }
+ }
+
}
static class CleanupReference extends PhantomReference<Object> {
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-04-13
20:34:39 UTC (rev 3089)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-04-14
16:19:52 UTC (rev 3090)
@@ -59,10 +59,20 @@
return lobBuffer.getLength();
}
+ /**
+ * Returns a new writer instance that is backed by the shared output stream.
+ * Closing a writer will prevent further writes.
+ * @return
+ */
public Writer getWriter() {
return new OutputStreamWriter(getOuputStream(), Charset.forName(encoding));
}
+ /**
+ * The returned output stream is shared among all uses.
+ * Once closed no further writing can occur
+ * @return
+ */
public FileStoreOutputStream getOuputStream() {
if (fsos == null) {
fsos = lobBuffer.createOutputStream(DataTypeManager.MAX_LOB_MEMORY_BYTES);
@@ -77,6 +87,6 @@
@Override
public boolean isPersistent() {
- return true;
+ return fsos == null || fsos.bytesWritten();
}
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java 2011-04-13
20:34:39 UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/query/function/aggregate/TextAgg.java 2011-04-14
16:19:52 UTC (rev 3090)
@@ -72,7 +72,7 @@
}
}, textLine.getDelimiter(), textLine.getQuote()));
}
- w.close();
+ w.flush();
return fisf;
} catch (IOException e) {
throw new TeiidProcessingException(e);
@@ -96,7 +96,7 @@
String in = (String)input;
Writer w = result.getWriter();
w.write(in);
- w.close();
+ w.flush();
} catch (IOException e) {
throw new TeiidProcessingException(e);
}
Modified: trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java 2011-04-13
20:34:39 UTC (rev 3089)
+++ trunk/engine/src/main/java/org/teiid/query/function/aggregate/XMLAgg.java 2011-04-14
16:19:52 UTC (rev 3090)
@@ -27,7 +27,7 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.XMLType;
-import org.teiid.query.function.source.XMLSystemFunctions;
+import org.teiid.query.function.source.XMLSystemFunctions.XmlConcat;
import org.teiid.query.util.CommandContext;
/**
@@ -35,7 +35,8 @@
*/
public class XMLAgg extends AggregateFunction {
- private XMLType result;
+ private XMLType result;
+ private XmlConcat concat;
private CommandContext context;
public XMLAgg(CommandContext context) {
@@ -43,7 +44,8 @@
}
public void reset() {
- result = null;
+ concat = null;
+ result = null;
}
/**
@@ -52,13 +54,25 @@
* @see org.teiid.query.function.aggregate.AggregateFunction#addInputDirect(Object,
List)
*/
public void addInputDirect(Object input, List<?> tuple) throws
TeiidComponentException, TeiidProcessingException {
- result = XMLSystemFunctions.xmlConcat(context, result, input);
+ if (concat == null) {
+ concat = new XmlConcat(context.getBufferManager());
+ }
+ concat.addValue(input);
}
/**
+ * @throws TeiidProcessingException
+ * @throws TeiidComponentException
* @see org.teiid.query.function.aggregate.AggregateFunction#getResult()
*/
- public Object getResult() {
+ public Object getResult() throws TeiidComponentException, TeiidProcessingException {
+ if (result == null) {
+ if (concat == null) {
+ return null;
+ }
+ result = concat.close();
+ concat = null;
+ }
return result;
}
Modified:
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java 2011-04-13
20:34:39 UTC (rev 3089)
+++
trunk/engine/src/main/java/org/teiid/query/function/source/XMLSystemFunctions.java 2011-04-14
16:19:52 UTC (rev 3090)
@@ -386,27 +386,67 @@
return singleValue;
}
- XMLType result = new
XMLType(XMLSystemFunctions.saveToBufferManager(context.getBufferManager(), new
XMLTranslator() {
-
- @Override
- public void translate(Writer writer) throws TransformerException,
- IOException {
- try {
- XMLOutputFactory factory = getOutputFactory();
- XMLEventWriter eventWriter = factory.createXMLEventWriter(writer);
- XMLEventFactory eventFactory = XMLEventFactory.newInstance();
- convertValue(writer, eventWriter, eventFactory, xml);
- for (Object object : other) {
- convertValue(writer, eventWriter, eventFactory, object);
- }
- eventWriter.flush(); //woodstox needs a flush rather than a close
- } catch (XMLStreamException e) {
- throw new TransformerException(e);
- }
+ XmlConcat concat = new XmlConcat(context.getBufferManager());
+ concat.addValue(xml);
+ for (Object object : other) {
+ concat.addValue(object);
+ }
+ return concat.close();
+ }
+
+ public static class XmlConcat {
+ private XMLOutputFactory factory;
+ private XMLEventWriter eventWriter;
+ private XMLEventFactory eventFactory;
+ private Writer writer;
+ private FileStoreInputStreamFactory fsisf;
+ private FileStore fs;
+
+ public XmlConcat(BufferManager bm) throws TeiidProcessingException {
+ fs = bm.createFileStore("xml"); //$NON-NLS-1$
+ fsisf = new FileStoreInputStreamFactory(fs, Streamable.ENCODING);
+ writer = fsisf.getWriter();
+ factory = getOutputFactory();
+ try {
+ eventWriter = factory.createXMLEventWriter(writer);
+ } catch (XMLStreamException e) {
+ fs.remove();
+ throw new TeiidProcessingException(e);
}
- }));
- result.setType(Type.CONTENT);
- return result;
+ eventFactory = XMLEventFactory.newInstance();
+ }
+
+ public void addValue(Object object) throws TeiidProcessingException {
+ try {
+ convertValue(writer, eventWriter, eventFactory, object);
+ } catch (IOException e) {
+ fs.remove();
+ throw new TeiidProcessingException(e);
+ } catch (XMLStreamException e) {
+ fs.remove();
+ throw new TeiidProcessingException(e);
+ } catch (TransformerException e) {
+ fs.remove();
+ throw new TeiidProcessingException(e);
+ }
+ }
+
+ public XMLType close() throws TeiidProcessingException {
+ try {
+ eventWriter.flush();
+ writer.close();
+ } catch (XMLStreamException e) {
+ fs.remove();
+ throw new TeiidProcessingException(e);
+ } catch (IOException e) {
+ fs.remove();
+ throw new TeiidProcessingException(e);
+ }
+ XMLType result = new XMLType(new SQLXMLImpl(fsisf));
+ result.setType(Type.CONTENT);
+ return result;
+ }
+
}
private static XMLOutputFactory getOutputFactory() throws FactoryConfigurationError {