Author: shawkins
Date: 2011-09-09 15:04:45 -0400 (Fri, 09 Sep 2011)
New Revision: 3462
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/build/kits/jboss-container/teiid-releasenotes.html
trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java
trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java
trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java
trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java
trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java
trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java
trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java
trunk/common-core/src/main/resources/org/teiid/core/util/application.properties
trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java
trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java
trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 adding more memory tracking for soft references and allowing for lob inlining.
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-09-09
19:04:45 UTC (rev 3462)
@@ -43,7 +43,11 @@
<!-- Max storage space, in MB, to be used for buffer files (default 50G)
-->
<property name="maxBufferSpace">51200</property>
<!-- Max open buffer files (default 64) -->
- <property name="maxOpenFiles">64</property>
+ <property name="maxOpenFiles">64</property>
+ <!-- Set to true to allow inlining of memory based and small lobs into
results.
+ However inline lob values are not supported by pre-7.6 clients, so disable
this
+ property if using older clients utilizing lobs. (default true) -->
+ <property name="inlineLobs">true</property>
</bean>
<bean name="CacheFactory"
class="org.teiid.cache.jboss.ClusterableCacheFactory">
Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-09-09 17:10:27 UTC (rev
3461)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-09-09 19:04:45 UTC (rev
3462)
@@ -30,6 +30,7 @@
<LI><B>File Enhancements</B> - the file translator can now optionally
(via the ExceptionIfFileNotFound property) throw an exception if the path refers to a file
that doesn't exist. The file resource adapter can be configured to map file names and
can prevent parent path .. references. See the Admin Guide or the file-ds.xml template
for more.
<LI><B>TEXTTABLE Enhancements</B> - TEXTTABLE can now parse fixed
width files that do not use a row delimiter and can optionally produce fixed values that
haven't been trimmed.
<LI><B>Temp table transactions</B> - Internal materialized views and
temp table usage from a session and within procedures can take advantage of greater
transaction support.
+ <LI><B>Buffering Improvements</B> - Added the ability to inline
memory based or small lobs and added tracking of the memory held by soft references.
</UL>
<h2><a name="Compatibility">Compatibility
Issues</a></h2>
@@ -39,9 +40,10 @@
<li>Support for using the FROM clause post item hints MAKEDEP/MAKENOTDEP has been
deprecated. Use the pre item comment hint syntax instead, e.g. /*+ MAKEDEP */ tbl
</ul>
-<h4>from 7.4</h4>
+<h4>from 7.5</h4>
<ul>
- <li>Leave was added as a reserved word.
+ <li>Leave was added as a reserved word.
+ <li>Lob inlining is incompatible with clients older than 7.6. If a 7.6 server
will have older clients that use lobs connect to it, then the BufferService property
inlineLobs should be set to false in the teiid-jboss-beans.xml file.
</ul>
<h4>from 7.4</h4>
Modified: trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java 2011-09-09 17:10:27
UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java 2011-09-09 19:04:45
UTC (rev 3462)
@@ -22,6 +22,7 @@
package org.teiid.core.types;
+import java.io.BufferedInputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
@@ -75,17 +76,8 @@
this.charset = charset;
}
- public void free() throws SQLException {
- if (this.streamFactory != null) {
- try {
- this.streamFactory.free();
- this.streamFactory = null;
- } catch (IOException e) {
- SQLException ex = new SQLException(e.getMessage());
- ex.initCause(e);
- throw ex;
- }
- }
+ public void free() {
+ this.streamFactory = null;
}
public Reader getCharacterStream() throws SQLException {
@@ -126,5 +118,35 @@
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(streamFactory);
}
+
+ /**
+ * Returns the number of bytes.
+ */
+ public long length() throws SQLException{
+ if (getStreamFactory().getLength() == -1) {
+ getStreamFactory().setLength(length(getBinaryStream()));
+ }
+ return getStreamFactory().getLength();
+ }
+ static long length(InputStream is) throws SQLException {
+ if (!(is instanceof BufferedInputStream)) {
+ is = new BufferedInputStream(is);
+ }
+ try {
+ long length = 0;
+ while (is.read() != -1) {
+ length++;
+ }
+ return length;
+ } catch (IOException e) {
+ throw new SQLException(e);
+ } finally {
+ try {
+ is.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java 2011-09-09 17:10:27
UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java 2011-09-09 19:04:45
UTC (rev 3462)
@@ -22,7 +22,6 @@
package org.teiid.core.types;
-import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -109,32 +108,6 @@
}
/**
- * Returns the number of bytes in the <code>BLOB</code> value
- * designated by this <code>Blob</code> object.
- * @return length of the <code>BLOB</code> in bytes
- */
- public long length() throws SQLException{
- if (getStreamFactory().getLength() == -1) {
- InputStream is = new BufferedInputStream(getBinaryStream());
- try {
- long length = 0;
- while (is.read() != -1) {
- length++;
- }
- getStreamFactory().setLength(length);
- } catch (IOException e) {
- throw new SQLException(e);
- } finally {
- try {
- is.close();
- } catch (IOException e) {
- }
- }
- }
- return getStreamFactory().getLength();
- }
-
- /**
* Determines the byte position in the <code>BLOB</code> value
* designated by this <code>Blob</code> object at which
* <code>pattern</code> begins. The search begins at position
Modified: trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java 2011-09-09 17:10:27
UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java 2011-09-09 19:04:45
UTC (rev 3462)
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.sql.Blob;
@@ -32,6 +33,7 @@
import javax.sql.rowset.serial.SerialBlob;
import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectConverterUtil;
/**
@@ -61,18 +63,10 @@
public byte[] getBytes(long pos, int length) throws SQLException {
return this.reference.getBytes(pos, length);
}
-
- /**
- * @see java.sql.Blob#length()
- */
- public long length() throws SQLException {
- //caching the length
- if (this.length != -1) {
- return this.length;
- }
- // if did not find before then do it again.
- this.length = this.reference.length();
- return length;
+
+ @Override
+ long computeLength() throws SQLException {
+ return this.reference.length();
}
/**
@@ -142,12 +136,38 @@
}
@Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ protected void readReference(ObjectInput in) throws IOException {
+ byte[] bytes = new byte[(int)getLength()];
+ in.readFully(bytes);
try {
- length();
+ this.reference = new SerialBlob(bytes);
} catch (SQLException e) {
+ throw new IOException(e);
}
- super.writeExternal(out);
}
+ @Override
+ protected void writeReference(final ObjectOutput out) throws IOException {
+ try {
+ writeBinary(out, getBinaryStream(), (int)length);
+ } catch (SQLException e) {
+ throw new IOException();
+ }
+ }
+
+ static void writeBinary(final ObjectOutput out, InputStream is, int length) throws
IOException {
+ OutputStream os = new OutputStream() {
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+ };
+ try {
+ ObjectConverterUtil.write(os, is, length, false);
+ } finally {
+ is.close();
+ }
+ }
+
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java 2011-09-09 17:10:27
UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java 2011-09-09 19:04:45
UTC (rev 3462)
@@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.Reader;
@@ -35,6 +36,7 @@
import javax.sql.rowset.serial.SerialClob;
import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectConverterUtil;
/**
@@ -72,17 +74,10 @@
public String getSubString(long pos, int length) throws SQLException {
return this.reference.getSubString(pos, length);
}
-
- /**
- * @see java.sql.Clob#length()
- */
- public long length() throws SQLException {
- if (this.length != -1) {
- return this.length;
- }
-
- this.length = this.reference.length();
- return length;
+
+ @Override
+ long computeLength() throws SQLException {
+ return this.reference.length();
}
/**
@@ -218,12 +213,52 @@
}
@Override
- public void writeExternal(ObjectOutput out) throws IOException {
+ protected void readReference(ObjectInput in) throws IOException {
+ char[] chars = new char[(int)length];
+ for (int i = 0; i < chars.length; i++) {
+ chars[i] = in.readChar();
+ }
try {
- length();
+ this.reference = new SerialClob(chars);
} catch (SQLException e) {
+ throw new IOException(e);
}
- super.writeExternal(out);
}
+
+ /**
+ * Since we have the length in chars we'll just write out in double byte format.
+ * These clobs should be small, so the wasted space should be minimal.
+ */
+ @Override
+ protected void writeReference(final ObjectOutput out) throws IOException {
+ Writer w = new Writer() {
+
+ @Override
+ public void write(char[] cbuf, int off, int len) throws IOException {
+ for (int i = off; i < len; i++) {
+ out.writeShort(cbuf[i]);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ };
+ Reader r;
+ try {
+ r = getCharacterStream();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ try {
+ ObjectConverterUtil.write(w, r, (int)length, false);
+ } finally {
+ r.close();
+ }
+ }
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
===================================================================
---
trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java 2011-09-09
17:10:27 UTC (rev 3461)
+++
trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -36,12 +36,21 @@
import java.sql.SQLXML;
import javax.activation.DataSource;
+import javax.sql.rowset.serial.SerialBlob;
+import javax.sql.rowset.serial.SerialClob;
import javax.xml.transform.Source;
import org.teiid.core.util.ReaderInputStream;
public abstract class InputStreamFactory implements Source {
+ public enum StorageMode {
+ MEMORY, //TODO: sources may return Serial values that are much too large and we should
convert them to persistent.
+ PERSISTENT,
+ FREE,
+ OTHER
+ }
+
public interface StreamFactoryReference {
void setStreamFactory(InputStreamFactory inputStreamFactory);
@@ -49,7 +58,7 @@
}
private String systemId;
- private long length = -1;
+ protected long length = -1;
/**
* Get a new InputStream
@@ -67,10 +76,17 @@
this.systemId = systemId;
}
+ /**
+ * @throws IOException
+ */
public void free() throws IOException {
}
+ /**
+ * Length in bytes of the {@link InputStream}
+ * @return the length or -1 if the length is not known
+ */
public long getLength() {
return length;
}
@@ -79,12 +95,15 @@
this.length = length;
}
+ /**
+ * @throws IOException
+ */
public Reader getCharacterStream() throws IOException {
return null;
}
- public boolean isPersistent() {
- return false;
+ public StorageMode getStorageMode() {
+ return StorageMode.OTHER;
}
public static class FileInputStreamFactory extends InputStreamFactory {
@@ -107,8 +126,8 @@
}
@Override
- public boolean isPersistent() {
- return true;
+ public StorageMode getStorageMode() {
+ return StorageMode.PERSISTENT;
}
}
@@ -162,6 +181,11 @@
public OutputStream getOutputStream() throws IOException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public StorageMode getStorageMode() {
+ return getStorageMode(clob);
+ }
}
@@ -184,11 +208,13 @@
@Override
public long getLength() {
- try {
- return blob.length();
- } catch (SQLException e) {
- return -1;
- }
+ if (length == -1) {
+ try {
+ length = blob.length();
+ } catch (SQLException e) {
+ }
+ }
+ return length;
}
@Override
@@ -205,9 +231,35 @@
public OutputStream getOutputStream() throws IOException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public StorageMode getStorageMode() {
+ return getStorageMode(blob);
+ }
}
+ public static StorageMode getStorageMode(Object lob) {
+ if (lob instanceof Streamable<?>) {
+ return getStorageMode(((Streamable<?>)lob).getReference());
+ }
+ if (lob instanceof SerialClob) {
+ return StorageMode.MEMORY;
+ }
+ if (lob instanceof SerialBlob) {
+ return StorageMode.MEMORY;
+ }
+ if (lob instanceof BaseLob) {
+ BaseLob baseLob = (BaseLob)lob;
+ try {
+ return baseLob.getStreamFactory().getStorageMode();
+ } catch (SQLException e) {
+ return StorageMode.FREE;
+ }
+ }
+ return StorageMode.OTHER;
+ }
+
public static class SQLXMLInputStreamFactory extends InputStreamFactory implements
DataSource {
private SQLXML sqlxml;
@@ -248,6 +300,11 @@
public OutputStream getOutputStream() throws IOException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public StorageMode getStorageMode() {
+ return getStorageMode(sqlxml);
+ }
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -71,6 +71,16 @@
public InputStream getInputStream() throws IOException {
return new ByteArrayInputStream(bytes);
}
+
+ @Override
+ public StorageMode getStorageMode() {
+ return StorageMode.MEMORY;
+ }
+
+ @Override
+ public long getLength() {
+ return bytes.length;
+ }
});
setEncoding(Streamable.ENCODING);
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -27,7 +27,8 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicLong;
import org.teiid.core.CorePlugin;
@@ -44,7 +45,7 @@
private static final long serialVersionUID = -8252488562134729374L;
- private static AtomicInteger counter = new AtomicInteger();
+ private static AtomicLong counter = new AtomicLong();
public static final String ENCODING = "UTF-8"; //$NON-NLS-1$
public static final Charset CHARSET = Charset.forName(ENCODING);
@@ -66,10 +67,23 @@
this.reference = reference;
}
+ /**
+ * Returns the cached length. May be binary or character based.
+ * @return
+ */
public long getLength() {
return length;
}
+ abstract long computeLength() throws SQLException;
+
+ public long length() throws SQLException {
+ if (length == -1) {
+ length = computeLength();
+ }
+ return length;
+ }
+
public T getReference() {
return reference;
}
@@ -82,9 +96,9 @@
return this.referenceStreamId;
}
- /*public void setReferenceStreamId(String id) {
+ public void setReferenceStreamId(String id) {
this.referenceStreamId = id;
- }*/
+ }
@Override
public String toString() {
@@ -99,12 +113,27 @@
ClassNotFoundException {
length = in.readLong();
referenceStreamId = (String)in.readObject();
+ if (referenceStreamId == null) {
+ //we expect the data inline
+ readReference(in);
+ }
}
+ protected abstract void readReference(ObjectInput in) throws IOException;
+
@Override
public void writeExternal(ObjectOutput out) throws IOException {
+ try {
+ length();
+ } catch (SQLException e) {
+ }
out.writeLong(length);
out.writeObject(referenceStreamId);
+ if (referenceStreamId == null) {
+ writeReference(out);
+ }
}
-
+
+ protected abstract void writeReference(ObjectOutput out) throws IOException;
+
}
Modified: trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java 2011-09-09 17:10:27
UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java 2011-09-09 19:04:45
UTC (rev 3462)
@@ -189,4 +189,29 @@
}
}
}
+
+ @Override
+ long computeLength() throws SQLException {
+ if (this.reference instanceof SQLXMLImpl) {
+ SQLXMLImpl impl = (SQLXMLImpl)this.reference;
+ return impl.length();
+ }
+ return BaseLob.length(getBinaryStream());
+ }
+
+ @Override
+ protected void readReference(ObjectInput in) throws IOException {
+ byte[] bytes = new byte[(int)getLength()];
+ in.readFully(bytes);
+ this.reference = new SQLXMLImpl(bytes);
+ }
+
+ @Override
+ protected void writeReference(final ObjectOutput out) throws IOException {
+ try {
+ BlobType.writeBinary(out, getBinaryStream(), (int)length);
+ } catch (SQLException e) {
+ throw new IOException();
+ }
+ }
}
Modified: trunk/common-core/src/main/resources/org/teiid/core/util/application.properties
===================================================================
---
trunk/common-core/src/main/resources/org/teiid/core/util/application.properties 2011-09-09
17:10:27 UTC (rev 3461)
+++
trunk/common-core/src/main/resources/org/teiid/core/util/application.properties 2011-09-09
19:04:45 UTC (rev 3462)
@@ -1,5 +1,5 @@
-build.releaseNumber=${pom.version}
-build.number=${pom.version}
+build.releaseNumber=${project.version}
+build.number=${project.version}
build.date=@build-date@
copyright=Copyright (C) 2008-2009 Red Hat, Inc
url=${site.url}
\ No newline at end of file
Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -22,21 +22,14 @@
package org.teiid.core.types;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
import javax.sql.rowset.serial.SerialBlob;
-import org.teiid.core.types.BlobType;
-import org.teiid.core.util.UnitTestUtil;
-
-
import junit.framework.TestCase;
+import org.junit.Test;
+import org.teiid.core.util.UnitTestUtil;
+
public class TestBlobValue extends TestCase {
public void testBlobValue() throws Exception {
@@ -55,22 +48,27 @@
String key = bv.getReferenceStreamId();
// now force to serialize
- File saved = new
File(UnitTestUtil.getTestScratchPath()+"/blobassaved.bin"); //$NON-NLS-1$
- ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(saved));
- out.writeObject(bv);
- out.close();
-
- // now read back the object from serilized state
- ObjectInputStream in = new ObjectInputStream(new FileInputStream(saved));
- BlobType read = (BlobType)in.readObject();
+ BlobType read = UnitTestUtil.helpSerialize(bv);
// make sure we have kept the reference stream id
assertEquals(key, read.getReferenceStreamId());
// and lost the original object
assertNull(read.getReference());
+ }
+
+ @Test public void testReferencePersistence() throws Exception {
+ String testString = "this is test clob"; //$NON-NLS-1$
+ SerialBlob blob = new SerialBlob(testString.getBytes());
- saved.delete();
+ BlobType bv = new BlobType(blob);
+ bv.setReferenceStreamId(null);
+ // now force to serialize
+ BlobType read = UnitTestUtil.helpSerialize(bv);
+
+ assertNull(read.getReferenceStreamId());
+
+ assertEquals(testString, new String(read.getBytes(1, (int)blob.length())));
}
}
Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -24,12 +24,7 @@
import static org.junit.Assert.*;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Reader;
import javax.sql.rowset.serial.SerialClob;
@@ -56,15 +51,8 @@
String key = cv.getReferenceStreamId();
// now force to serialize
- File saved = new
File(UnitTestUtil.getTestScratchPath()+"/clobassaved.bin"); //$NON-NLS-1$
- ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(saved));
- out.writeObject(cv);
- out.close();
+ ClobType read = UnitTestUtil.helpSerialize(cv);
- // now read back the object from serilized state
- ObjectInputStream in = new ObjectInputStream(new FileInputStream(saved));
- ClobType read = (ClobType)in.readObject();
-
assertTrue(read.length() > 0);
// make sure we have kept the reference stream id
@@ -72,8 +60,21 @@
// and lost the original object
assertNull(read.getReference());
+ }
+
+ @Test public void testReferencePersistence() throws Exception {
+ String testString = "this is test clob"; //$NON-NLS-1$
+ SerialClob clob = new SerialClob(testString.toCharArray());
- saved.delete();
+ ClobType cv = new ClobType(clob);
+ cv.setReferenceStreamId(null);
+
+ // now force to serialize
+ ClobType read = UnitTestUtil.helpSerialize(cv);
+
+ assertTrue(read.length() > 0);
+
+ assertEquals(testString, read.getSubString(1, testString.length()));
}
@Test public void testClobSubstring() throws Exception {
Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -22,20 +22,11 @@
package org.teiid.core.types;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import junit.framework.TestCase;
-import org.teiid.core.types.SQLXMLImpl;
-import org.teiid.core.types.XMLType;
import org.teiid.core.util.UnitTestUtil;
-import junit.framework.TestCase;
-
-
public class TestXMLValue extends TestCase {
public void testXMLValue() throws Exception {
@@ -55,22 +46,26 @@
String key = xv.getReferenceStreamId();
// now force to serialize
- File saved = new
File(UnitTestUtil.getTestScratchPath()+"/xmlsaved.bin"); //$NON-NLS-1$
- ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(saved));
- out.writeObject(xv);
- out.close();
-
- // now read back the object from serilized state
- ObjectInputStream in = new ObjectInputStream(new FileInputStream(saved));
- XMLType read = (XMLType)in.readObject();
+ XMLType read = UnitTestUtil.helpSerialize(xv);
// make sure we have kept the reference stream id
assertEquals(key, read.getReferenceStreamId());
// and lost the original object
assertNull(read.getReference());
+ }
+
+ public void testReferencePersistence() throws Exception {
+ String testString = "<foo>this is an xml value test</foo>";
//$NON-NLS-1$
+ SQLXMLImpl xml = new SQLXMLImpl(testString);
- saved.delete();
+ XMLType xv = new XMLType(xml);
+ xv.setReferenceStreamId(null);
+
+ // now force to serialize
+ XMLType read = UnitTestUtil.helpSerialize(xv);
+
+ assertEquals(testString, read.getString());
}
}
Modified:
trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml
===================================================================
---
trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml 2011-09-09
17:10:27 UTC (rev 3461)
+++
trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml 2011-09-09
19:04:45 UTC (rev 3462)
@@ -19,6 +19,9 @@
<itemizedlist>
<listitem><para>The cache hint should appear at the beginning of the
SQL. It will not have any affect on INSERT/UPDATE/DELETE statements or virtual update
procedure definitions.</para></listitem>
<listitem><para><emphasis>pref_mem</emphasis> - if present
indicates that the cached results should prefer to remain in memory. They are not however
required to be memory only.
+ <note><para>Care should be taken to not over use the pref_mem option.
+ The memory preference is implemented with Java soft references. While soft
references are effective at preventing out of memory conditions.
+ Too much memory held by soft references can limit the effective working memory.
Consult your JVM options for clearing soft references if you need to tune their
behavior.</para></note>
</para></listitem>
<listitem><para><emphasis>ttl:n</emphasis> - if present n
indicates the time to live value in milliseconds.
</para></listitem>
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-09-09
17:10:27 UTC (rev 3461)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -56,6 +56,9 @@
@Override
public long getLength() {
+ if (fsos != null && !fsos.bytesWritten()) {
+ return fsos.getCount();
+ }
return lobBuffer.getLength();
}
@@ -86,7 +89,10 @@
}
@Override
- public boolean isPersistent() {
- return fsos == null || fsos.bytesWritten();
+ public StorageMode getStorageMode() {
+ if (fsos == null) {
+ return StorageMode.PERSISTENT;
+ }
+ return StorageMode.MEMORY;
}
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2011-09-09 17:10:27
UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java 2011-09-09 19:04:45
UTC (rev 3462)
@@ -45,6 +45,10 @@
import org.teiid.core.types.SQLXMLImpl;
import org.teiid.core.types.Streamable;
import org.teiid.core.types.XMLType;
+import org.teiid.core.types.InputStreamFactory.BlobInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.ClobInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.SQLXMLInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.StorageMode;
import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.query.QueryPlugin;
import org.teiid.query.sql.symbol.Expression;
@@ -55,6 +59,16 @@
*/
public class LobManager {
private Map<String, Streamable<?>> lobReferences = new
ConcurrentHashMap<String, Streamable<?>>();
+ private boolean inlineLobs = true;
+ private int maxMemoryBytes = DataTypeManager.MAX_LOB_MEMORY_BYTES;
+
+ public void setInlineLobs(boolean trackMemoryLobs) {
+ this.inlineLobs = trackMemoryLobs;
+ }
+
+ public void setMaxMemoryBytes(int maxMemoryBytes) {
+ this.maxMemoryBytes = maxMemoryBytes;
+ }
public void updateReferences(int[] lobIndexes, List<?> tuple)
throws TeiidComponentException {
@@ -64,6 +78,16 @@
continue;
}
Streamable lob = (Streamable) anObj;
+ try {
+ if (inlineLobs
+ && (InputStreamFactory.getStorageMode(lob) == StorageMode.MEMORY
+ || lob.length()*(lob instanceof ClobType?2:1) <= maxMemoryBytes)) {
+ lob.setReferenceStreamId(null);
+ continue;
+ }
+ } catch (SQLException e) {
+ //presumably the lob is bad, but let it slide for now
+ }
if (lob.getReference() == null) {
lob.setReference(getLobReference(lob.getReferenceStreamId()).getReference());
} else {
@@ -81,14 +105,14 @@
return lob;
}
- public static int[] getLobIndexes(List expressions) {
+ public static int[] getLobIndexes(List<? extends Expression> expressions) {
if (expressions == null) {
return null;
}
int[] result = new int[expressions.size()];
int resultIndex = 0;
for (int i = 0; i < expressions.size(); i++) {
- Expression expr = (Expression) expressions.get(i);
+ Expression expr = expressions.get(i);
if (DataTypeManager.isLOB(expr.getType()) || expr.getType() ==
DataTypeManager.DefaultDataClasses.OBJECT) {
result[resultIndex++] = i;
}
@@ -115,7 +139,7 @@
try {
BaseLob baseLob = (BaseLob)lob.getReference();
InputStreamFactory isf = baseLob.getStreamFactory();
- if (isf.isPersistent()) {
+ if (isf.getStorageMode() == StorageMode.PERSISTENT) {
return lob;
}
} catch (SQLException e) {
@@ -127,19 +151,15 @@
Streamable<?> persistedLob;
try {
- InputStreamFactory isf = new InputStreamFactory() {
- @Override
- public InputStream getInputStream() throws IOException {
- if (lob instanceof BlobType) {
- return new BlobInputStreamFactory((Blob)lob).getInputStream();
- }
- else if (lob instanceof ClobType) {
- return new ClobInputStreamFactory((Clob)lob).getInputStream();
- }
- return new SQLXMLInputStreamFactory((SQLXML)lob).getInputStream();
- }
- };
- InputStream is = isf.getInputStream();
+ InputStream is = null;
+ if (lob instanceof BlobType) {
+ is = new BlobInputStreamFactory((Blob)lob).getInputStream();
+ }
+ else if (lob instanceof ClobType) {
+ is = new ClobInputStreamFactory((Clob)lob).getInputStream();
+ } else {
+ is = new SQLXMLInputStreamFactory((SQLXML)lob).getInputStream();
+ }
OutputStream fsos = store.createOutputStream();
length = ObjectConverterUtil.write(fsos, is, bytes, -1);
} catch (IOException e) {
@@ -156,8 +176,8 @@
}
@Override
- public boolean isPersistent() {
- return true;
+ public StorageMode getStorageMode() {
+ return StorageMode.PERSISTENT;
}
};
@@ -178,4 +198,8 @@
}
return persistedLob;
}
+
+ public int getLobCount() {
+ return this.lobReferences.size();
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -94,6 +94,12 @@
this.batchSize = batchSize;
}
+ public void setInlineLobs(boolean inline) {
+ if (this.lobManager != null) {
+ this.lobManager.setInlineLobs(inline);
+ }
+ }
+
public String getId() {
if (this.uuid == null) {
this.uuid = java.util.UUID.randomUUID().toString();
@@ -373,4 +379,11 @@
return types;
}
+ public int getLobCount() {
+ if (this.lobManager == null) {
+ return 0;
+ }
+ return this.lobManager.getLobCount();
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-09
17:10:27 UTC (rev 3461)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -94,12 +94,12 @@
private final class CleanupHook implements
org.teiid.common.buffer.BatchManager.CleanupHook {
- private long id;
+ private Long id;
private WeakReference<BatchManagerImpl> ref;
- CleanupHook(long id, BatchManagerImpl batchManager) {
+ CleanupHook(Long id, WeakReference<BatchManagerImpl> batchManager) {
this.id = id;
- this.ref = new WeakReference<BatchManagerImpl>(batchManager);
+ this.ref = batchManager;
}
public void cleanup() {
@@ -120,6 +120,7 @@
private AtomicLong unusedSpace = new AtomicLong();
private int[] lobIndexes;
private SizeUtility sizeUtility;
+ private WeakReference<BatchManagerImpl> ref = new
WeakReference<BatchManagerImpl>(this);
private BatchManagerImpl(String newID, int[] lobIndexes) {
this.id = newID;
@@ -136,7 +137,7 @@
@Override
public ManagedBatch createManagedBatch(TupleBatch batch, boolean softCache)
throws TeiidComponentException {
- ManagedBatchImpl mbi = new ManagedBatchImpl(batch, this, softCache);
+ ManagedBatchImpl mbi = new ManagedBatchImpl(batch, ref, softCache);
mbi.addToCache(false);
persistBatchReferences();
return mbi;
@@ -196,6 +197,11 @@
public void remove() {
this.store.remove();
}
+
+ @Override
+ public String toString() {
+ return id;
+ }
}
/**
@@ -205,10 +211,18 @@
TreeMap<Long, ManagedBatchImpl> batches = new TreeMap<Long,
ManagedBatchImpl>();
Long lastUsed = null;
- ManagedBatchImpl removeBatch(long row) {
+ ManagedBatchImpl removeBatch(Long row) {
ManagedBatchImpl result = batches.remove(row);
if (result != null) {
activeBatchKB -= result.sizeEstimate;
+ if (result.softCache) {
+ BatchSoftReference ref = (BatchSoftReference)result.batchReference;
+ if (ref != null) {
+ maxReserveKB += ref.sizeEstimate;
+ ref.sizeEstimate = 0;
+ ref.clear();
+ }
+ }
}
return result;
}
@@ -220,21 +234,22 @@
private volatile TupleBatch activeBatch;
private volatile Reference<TupleBatch> batchReference;
private int beginRow;
- private BatchManagerImpl batchManager;
- private long id;
+ private WeakReference<BatchManagerImpl> managerRef;
+ private Long id;
private LobManager lobManager;
private int sizeEstimate;
- public ManagedBatchImpl(TupleBatch batch, BatchManagerImpl manager, boolean softCache)
{
+ public ManagedBatchImpl(TupleBatch batch, WeakReference<BatchManagerImpl> ref,
boolean softCache) {
this.softCache = softCache;
id = batchAdded.incrementAndGet();
this.activeBatch = batch;
this.beginRow = batch.getBeginRow();
- this.batchManager = manager;
- if (this.batchManager.lobIndexes != null) {
+ this.managerRef = ref;
+ BatchManagerImpl batchManager = ref.get();
+ if (batchManager.lobIndexes != null) {
this.lobManager = new LobManager();
}
- sizeEstimate = (int) Math.max(1, manager.sizeUtility.getBatchSize(batch) / 1024);
+ sizeEstimate = (int) Math.max(1, batchManager.sizeUtility.getBatchSize(batch) /
1024);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE))
{
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to
BufferManager", id, "with size estimate", sizeEstimate); //$NON-NLS-1$
//$NON-NLS-2$
}
@@ -247,6 +262,11 @@
}
private void addToCache(boolean update) {
+ BatchManagerImpl batchManager = managerRef.get();
+ if (batchManager == null) {
+ remove();
+ return;
+ }
synchronized (activeBatches) {
TupleBatch batch = this.activeBatch;
if (batch == null) {
@@ -272,11 +292,25 @@
@Override
public TupleBatch getBatch(boolean cache, String[] types) throws
TeiidComponentException {
+ BatchManagerImpl batchManager = managerRef.get();
+ if (batchManager == null) {
+ remove();
+ throw new AssertionError("Already removed"); //$NON-NLS-1$
+ }
long reads = readAttempts.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE))
{
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, "getting
batch", reads, "reference hits", referenceHit.get()); //$NON-NLS-1$
//$NON-NLS-2$
}
synchronized (activeBatches) {
+ for (int i = 0; i < 10; i++) {
+ BatchSoftReference ref = (BatchSoftReference)SOFT_QUEUE.poll();
+ if (ref == null) {
+ break;
+ }
+ maxReserveKB += ref.sizeEstimate;
+ ref.sizeEstimate = 0;
+ ref.clear();
+ }
TupleBufferInfo tbi = activeBatches.remove(batchManager.id);
if (tbi != null) {
boolean put = true;
@@ -316,7 +350,7 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "reading
batch from disk, total reads:", count); //$NON-NLS-1$
}
try {
- this.batchManager.compactionLock.readLock().lock();
+ batchManager.compactionLock.readLock().lock();
long[] info = batchManager.physicalMapping.get(this.id);
Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
ObjectInputStream ois = new ObjectInputStream(new
BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
@@ -340,12 +374,17 @@
} catch (ClassNotFoundException e) {
throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading",
batchManager.id)); //$NON-NLS-1$
} finally {
- this.batchManager.compactionLock.readLock().unlock();
+ batchManager.compactionLock.readLock().unlock();
}
}
}
public synchronized void persist() throws TeiidComponentException {
+ BatchManagerImpl batchManager = managerRef.get();
+ if (batchManager == null) {
+ remove();
+ return;
+ }
boolean lockheld = false;
try {
TupleBatch batch = activeBatch;
@@ -376,7 +415,10 @@
}
}
if (softCache) {
- this.batchReference = new SoftReference<TupleBatch>(batch);
+ this.batchReference = new BatchSoftReference(batch, SOFT_QUEUE, sizeEstimate);
+ synchronized (activeBatches) {
+ maxReserveKB -= sizeEstimate;
+ }
} else if (useWeakReferences) {
this.batchReference = new WeakReference<TupleBatch>(batch);
}
@@ -389,26 +431,39 @@
persistent = true;
activeBatch = null;
if (lockheld) {
- this.batchManager.compactionLock.writeLock().unlock();
+ batchManager.compactionLock.writeLock().unlock();
}
}
}
public void remove() {
- cleanupManagedBatch(batchManager, id);
+ cleanupManagedBatch(managerRef.get(), id);
}
@Override
public CleanupHook getCleanupHook() {
- return new CleanupHook(id, batchManager);
+ return new CleanupHook(id, managerRef);
}
@Override
public String toString() {
- return "ManagedBatch " + batchManager.id + " " + this.id + "
" + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ return "ManagedBatch " + managerRef.get() + " " + this.id + "
" + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
}
+ private static class BatchSoftReference extends SoftReference<TupleBatch> {
+
+ private int sizeEstimate;
+
+ public BatchSoftReference(TupleBatch referent,
+ ReferenceQueue<? super TupleBatch> q, int sizeEstimate) {
+ super(referent, q);
+ this.sizeEstimate = sizeEstimate;
+ }
+ }
+
+ private static ReferenceQueue<? super TupleBatch> SOFT_QUEUE = new
ReferenceQueue<TupleBatch>();
+
// Configuration
private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
@@ -419,6 +474,7 @@
private volatile int reserveBatchKB;
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a
hint to set the reserveBatchKB
private boolean useWeakReferences = true;
+ private boolean inlineLobs = true;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
@@ -508,10 +564,11 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating
TupleBuffer:", newID, elements, Arrays.toString(tupleBuffer.getTypes()), "of
type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$
}
+ tupleBuffer.setInlineLobs(inlineLobs);
return tupleBuffer;
}
- private void cleanupManagedBatch(BatchManagerImpl batchManager, long id) {
+ private void cleanupManagedBatch(BatchManagerImpl batchManager, Long id) {
synchronized (activeBatches) {
TupleBufferInfo tbi = activeBatches.get(batchManager.id);
if (tbi != null && tbi.removeBatch(id) != null) {
@@ -730,4 +787,8 @@
this.useWeakReferences = useWeakReferences;
}
+ public void setInlineLobs(boolean inlineLobs) {
+ this.inlineLobs = inlineLobs;
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -112,9 +112,8 @@
} else if (type == DataTypeManager.DefaultDataClasses.BIG_DECIMAL) {
return isValueCacheEnabled?150:200;
}
- return 512; //assumes buffer overhead in the case of lobs
- //however the account for lobs is misleading as the lob
- //references are not actually removed from memory
+ return 512; //this is is misleading for lobs
+ //most references are not actually removed from memory
}
/**
Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-09-09
17:10:27 UTC (rev 3461)
+++
trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -872,6 +872,12 @@
private void doneProducingBatches() {
this.doneProducingBatches = true;
+ //TODO: we could perform more tracking to know what source lobs are in use
+ if (this.resultsBuffer.getLobCount() == 0) {
+ for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
+ connectorRequest.fullyCloseSource();
+ }
+ }
dqpCore.finishProcessing(this);
}
Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-09-09
17:10:27 UTC (rev 3461)
+++
trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -125,8 +125,8 @@
}
@Override
- public void free() throws IOException {
- f.close();
+ public StorageMode getStorageMode() {
+ return StorageMode.PERSISTENT;
}
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -66,6 +66,7 @@
}));
LobManager lobManager = new LobManager();
+ lobManager.setMaxMemoryBytes(4);
lobManager.updateReferences(new int[] {0,1}, Arrays.asList(clob, blob));
lobManager.persist(fs);
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -107,6 +107,7 @@
x.setType(DataTypeManager.DefaultDataClasses.CLOB);
List<ElementSymbol> schema = Arrays.asList(x);
TupleBuffer tb = new TupleBuffer(new FakeBatchManager(), "x", schema,
LobManager.getLobIndexes(schema), 32); //$NON-NLS-1$
+ tb.setInlineLobs(false);
ClobType c = new ClobType(new SerialClob(new char[0]));
TupleBatch batch = new TupleBatch(1, new List[] {Arrays.asList(c)});
tb.addTupleBatch(batch, false);
Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -120,7 +120,9 @@
@Override
public BufferManager getBufferManager() {
- return BufferManagerFactory.createBufferManager();
+ BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+ bm.setInlineLobs(false);
+ return bm;
}
});
core.setCacheFactory(new DefaultCacheFactory());
Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-09
17:10:27 UTC (rev 3461)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-09
19:04:45 UTC (rev 3462)
@@ -65,6 +65,7 @@
private int maxProcessingKb = BufferManager.DEFAULT_MAX_PROCESSING_KB;
private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
+ private boolean inlineLobs = true;
private FileStorageManager fsm;
/**
@@ -144,6 +145,10 @@
public void setConnectorBatchSize(int size) {
this.connectorBatchSize = size;
}
+
+ public void setInlineLobs(boolean inlineLobs) {
+ this.inlineLobs = inlineLobs;
+ }
public File getBufferDirectory() {
return bufferDir;
@@ -152,6 +157,10 @@
public boolean isUseDisk() {
return this.useDisk;
}
+
+ public boolean isInlineLobs() {
+ return inlineLobs;
+ }
@ManagementProperty(description="The max row count of a batch sent internally
within the query processor. Should be <= the connectorBatchSize. (default 256)")
public int getProcessorBatchSize() {