[teiid-commits] teiid SVN: r3462 - in trunk: build/kits/jboss-container/deploy/teiid and 11 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Fri Sep 9 15:04:45 EDT 2011


Author: shawkins
Date: 2011-09-09 15:04:45 -0400 (Fri, 09 Sep 2011)
New Revision: 3462

Modified:
   trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
   trunk/build/kits/jboss-container/teiid-releasenotes.html
   trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java
   trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java
   trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java
   trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java
   trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
   trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java
   trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java
   trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java
   trunk/common-core/src/main/resources/org/teiid/core/util/application.properties
   trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java
   trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
   trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java
   trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml
   trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
   trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
   trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
   trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
   trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
   trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 adding more memory tracking for soft references and allowing for lob inlining.

Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-09-09 19:04:45 UTC (rev 3462)
@@ -43,7 +43,11 @@
         <!-- Max storage space, in MB, to be used for buffer files (default 50G) -->
         <property name="maxBufferSpace">51200</property>
         <!-- Max open buffer files (default 64) -->
-        <property name="maxOpenFiles">64</property> 
+        <property name="maxOpenFiles">64</property>
+        <!-- Set to true to allow inlining of memory based and small lobs into results. 
+             However inline lob values are not supported by pre-7.6 clients, so disable this 
+             property if using older clients utilizing lobs. (default true) -->
+        <property name="inlineLobs">true</property>
     </bean>
     
     <bean name="CacheFactory" class="org.teiid.cache.jboss.ClusterableCacheFactory">

Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html	2011-09-09 19:04:45 UTC (rev 3462)
@@ -30,6 +30,7 @@
   <LI><B>File Enhancements</B> - the file translator can now optionally (via the ExceptionIfFileNotFound property) throw an exception if the path refers to a file that doesn't exist.  The file resource adapter can be configured to map file names and can prevent parent path .. references.  See the Admin Guide or the file-ds.xml template for more.
   <LI><B>TEXTTABLE Enhancements</B> - TEXTTABLE can now parse fixed width files that do not use a row delimiter and can optionally produce fixed values that haven't been trimmed.
   <LI><B>Temp table transactions</B> - Internal materialized views and temp table usage from a session and within procedures can take advantage of greater transaction support.
+  <LI><B>Buffering Improvements</B> - Added the ability to inline memory based or small lobs and added tracking of the memory held by soft references.
 </UL>
 
 <h2><a name="Compatibility">Compatibility Issues</a></h2>
@@ -39,9 +40,10 @@
   <li>Support for using the FROM clause post item hints MAKEDEP/MAKENOTDEP has been deprecated.  Use the pre item comment hint syntax instead, e.g. /*+ MAKEDEP */ tbl
 </ul>
 
-<h4>from 7.4</h4>
+<h4>from 7.5</h4>
 <ul>
-  <li>Leave was added as a reserved word. 
+  <li>Leave was added as a reserved word.
+  <li>Lob inlining is incompatible with clients older than 7.6.  If a 7.6 server will have older clients that use lobs connect to it, then the BufferService property inlineLobs should be set to false in the teiid-jboss-beans.xml file.
 </ul>
 
 <h4>from 7.4</h4>

Modified: trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/BaseLob.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -22,6 +22,7 @@
 
 package org.teiid.core.types;
 
+import java.io.BufferedInputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -75,17 +76,8 @@
 		this.charset = charset;
 	}
 	
-	public void free() throws SQLException {
-		if (this.streamFactory != null) {
-			try {
-				this.streamFactory.free();
-				this.streamFactory = null;
-			} catch (IOException e) {
-				SQLException ex = new SQLException(e.getMessage());
-				ex.initCause(e);
-				throw ex;
-			}
-		}
+	public void free() {
+		this.streamFactory = null;
 	}
 	
     public Reader getCharacterStream() throws SQLException {
@@ -126,5 +118,35 @@
     public void writeExternal(ObjectOutput out) throws IOException {
     	out.writeObject(streamFactory);
     }
+    
+    /**
+     * Returns the number of bytes.
+     */
+    public long length() throws SQLException{
+    	if (getStreamFactory().getLength() == -1) {
+    		getStreamFactory().setLength(length(getBinaryStream()));
+    	}
+        return getStreamFactory().getLength();
+    }
 
+	static long length(InputStream is) throws SQLException {
+		if (!(is instanceof BufferedInputStream)) {
+			is = new BufferedInputStream(is);
+		}
+		try {
+			long length = 0;
+			while (is.read() != -1) {
+				length++;
+			}
+			return length;
+		} catch (IOException e) {
+			throw new SQLException(e);
+		} finally {
+			try {
+				is.close();
+			} catch (IOException e) {
+			}
+		}
+	}
+
 }

Modified: trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/BlobImpl.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -22,7 +22,6 @@
 
 package org.teiid.core.types;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -109,32 +108,6 @@
     }
 
     /**
-     * Returns the number of bytes in the <code>BLOB</code> value
-     * designated by this <code>Blob</code> object.
-     * @return length of the <code>BLOB</code> in bytes
-     */
-    public long length() throws SQLException{
-    	if (getStreamFactory().getLength() == -1) {
-    		InputStream is = new BufferedInputStream(getBinaryStream());
-			try {
-	    		long length = 0;
-	    		while (is.read() != -1) {
-	    			length++;
-	    		}
-				getStreamFactory().setLength(length);
-			} catch (IOException e) {
-				throw new SQLException(e);
-			} finally {
-				try {
-					is.close();
-				} catch (IOException e) {
-				}
-			}
-    	}
-        return getStreamFactory().getLength();
-    }
-
-    /**
      * Determines the byte position in the <code>BLOB</code> value
      * designated by this <code>Blob</code> object at which
      * <code>pattern</code> begins.  The search begins at position

Modified: trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/BlobType.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -24,6 +24,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.OutputStream;
 import java.sql.Blob;
@@ -32,6 +33,7 @@
 import javax.sql.rowset.serial.SerialBlob;
 
 import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectConverterUtil;
 
 
 /**
@@ -61,18 +63,10 @@
     public byte[] getBytes(long pos, int length) throws SQLException {
         return this.reference.getBytes(pos, length);
     }
-
-    /** 
-     * @see java.sql.Blob#length()
-     */
-    public long length() throws SQLException {
-        //caching the length
-        if (this.length != -1) {
-            return this.length;
-        }
-        // if did not find before then do it again.
-        this.length = this.reference.length();
-        return length;
+    
+    @Override
+    long computeLength() throws SQLException {
+        return this.reference.length();
     }
     
     /** 
@@ -142,12 +136,38 @@
 	}
 	
 	@Override
-	public void writeExternal(ObjectOutput out) throws IOException {
+	protected void readReference(ObjectInput in) throws IOException {
+		byte[] bytes = new byte[(int)getLength()];
+		in.readFully(bytes);
 		try {
-			length();
+			this.reference = new SerialBlob(bytes);
 		} catch (SQLException e) {
+			throw new IOException(e);
 		}
-		super.writeExternal(out);
 	}
 	
+	@Override
+	protected void writeReference(final ObjectOutput out) throws IOException {
+		try {
+			writeBinary(out, getBinaryStream(), (int)length);
+		} catch (SQLException e) {
+			throw new IOException();
+		}
+	}
+
+	static void writeBinary(final ObjectOutput out, InputStream is, int length) throws IOException {
+		OutputStream os = new OutputStream() {
+			
+			@Override
+			public void write(int b) throws IOException {
+				out.write(b);
+			}
+		};
+		try {
+			ObjectConverterUtil.write(os, is, length, false);
+		} finally {
+			is.close();
+		}
+	}
+	
 }

Modified: trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/ClobType.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -24,6 +24,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.OutputStream;
 import java.io.Reader;
@@ -35,6 +36,7 @@
 import javax.sql.rowset.serial.SerialClob;
 
 import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.util.ObjectConverterUtil;
 
 
 /**
@@ -72,17 +74,10 @@
     public String getSubString(long pos, int length) throws SQLException {
         return this.reference.getSubString(pos, length);
     }
-
-    /** 
-     * @see java.sql.Clob#length()
-     */
-    public long length() throws SQLException {
-        if (this.length != -1) {
-            return this.length;
-        }
-        
-        this.length = this.reference.length();
-        return length;
+    
+    @Override
+    long computeLength() throws SQLException {
+        return this.reference.length();
     }
 
     /** 
@@ -218,12 +213,52 @@
 	}
 	
 	@Override
-	public void writeExternal(ObjectOutput out) throws IOException {
+	protected void readReference(ObjectInput in) throws IOException {
+		char[] chars = new char[(int)length];
+		for (int i = 0; i < chars.length; i++) {
+			chars[i] = in.readChar();
+		}
 		try {
-			length();
+			this.reference = new SerialClob(chars);
 		} catch (SQLException e) {
+			throw new IOException(e);
 		}
-		super.writeExternal(out);
 	}
+	
+	/**
+	 * Since we have the length in chars we'll just write out in double byte format.
+	 * These clobs should be small, so the wasted space should be minimal.
+	 */
+	@Override
+	protected void writeReference(final ObjectOutput out) throws IOException {
+		Writer w = new Writer() {
+			
+			@Override
+			public void write(char[] cbuf, int off, int len) throws IOException {
+				for (int i = off; i < len; i++) {
+					out.writeShort(cbuf[i]);
+				}
+			}
+			
+			@Override
+			public void flush() throws IOException {
+			}
+			
+			@Override
+			public void close() throws IOException {
+			}
+		};
+		Reader r;
+		try {
+			r = getCharacterStream();
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+		try {
+			ObjectConverterUtil.write(w, r, (int)length, false);
+		} finally {
+			r.close();
+		}
+	}
 
 }

Modified: trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/InputStreamFactory.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -36,12 +36,21 @@
 import java.sql.SQLXML;
 
 import javax.activation.DataSource;
+import javax.sql.rowset.serial.SerialBlob;
+import javax.sql.rowset.serial.SerialClob;
 import javax.xml.transform.Source;
 
 import org.teiid.core.util.ReaderInputStream;
 
 public abstract class InputStreamFactory implements Source {
 	
+	public enum StorageMode {
+		MEMORY, //TODO: sources may return Serial values that are much too large and we should convert them to persistent.
+		PERSISTENT,
+		FREE,
+		OTHER
+	}
+	
 	public interface StreamFactoryReference {
 		
 		void setStreamFactory(InputStreamFactory inputStreamFactory);
@@ -49,7 +58,7 @@
 	}
 	
 	private String systemId;
-	private long length = -1;
+	protected long length = -1;
 	
     /**
      * Get a new InputStream
@@ -67,10 +76,17 @@
     	this.systemId = systemId;
     }
     
+    /**
+	 * @throws IOException  
+	 */
     public void free() throws IOException {
     	
     }
     
+    /**
+     * Length in bytes of the {@link InputStream}
+     * @return the length or -1 if the length is not known
+     */
     public long getLength() {
 		return length;
 	}
@@ -79,12 +95,15 @@
 		this.length = length;
 	}
     
+    /**
+	 * @throws IOException  
+	 */
     public Reader getCharacterStream() throws IOException {
     	return null;
     }
     
-    public boolean isPersistent() {
-    	return false;
+    public StorageMode getStorageMode() {
+    	return StorageMode.OTHER;
     }
     
     public static class FileInputStreamFactory extends InputStreamFactory {
@@ -107,8 +126,8 @@
     	}
     	
     	@Override
-    	public boolean isPersistent() {
-    		return true;
+    	public StorageMode getStorageMode() {
+    		return StorageMode.PERSISTENT;
     	}
     	
     }
@@ -162,6 +181,11 @@
 		public OutputStream getOutputStream() throws IOException {
 			throw new UnsupportedOperationException();
 		}
+		
+		@Override
+		public StorageMode getStorageMode() {
+			return getStorageMode(clob);
+		}
     	
     }
     
@@ -184,11 +208,13 @@
     	
     	@Override
     	public long getLength() {
-    		try {
-				return blob.length();
-			} catch (SQLException e) {
-				return -1;
-			}
+    		if (length == -1) {
+	    		try {
+					length = blob.length();
+				} catch (SQLException e) {
+				}
+    		}
+    		return length;
     	}
     	
     	@Override
@@ -205,9 +231,35 @@
 		public OutputStream getOutputStream() throws IOException {
 			throw new UnsupportedOperationException();
 		}
+		
+		@Override
+		public StorageMode getStorageMode() {
+			return getStorageMode(blob);
+		}
     	
     }
     
+    public static StorageMode getStorageMode(Object lob) {
+    	if (lob instanceof Streamable<?>) {
+    		return getStorageMode(((Streamable<?>)lob).getReference());
+    	}
+    	if (lob instanceof SerialClob) {
+    		return StorageMode.MEMORY;
+    	}
+    	if (lob instanceof SerialBlob) {
+    		return StorageMode.MEMORY;
+    	}
+		if (lob instanceof BaseLob) {
+			BaseLob baseLob = (BaseLob)lob;
+			try {
+				return baseLob.getStreamFactory().getStorageMode();
+			} catch (SQLException e) {
+				return StorageMode.FREE;
+			}
+		}
+		return StorageMode.OTHER;
+    }
+    
     public static class SQLXMLInputStreamFactory extends InputStreamFactory implements DataSource {
     	
     	private SQLXML sqlxml;
@@ -248,6 +300,11 @@
 		public OutputStream getOutputStream() throws IOException {
 			throw new UnsupportedOperationException();
 		}
+		
+		@Override
+		public StorageMode getStorageMode() {
+			return getStorageMode(sqlxml);
+		}
     	
     }
     

Modified: trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/SQLXMLImpl.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -71,6 +71,16 @@
 			public InputStream getInputStream() throws IOException {
 				return new ByteArrayInputStream(bytes);
 			}
+			
+			@Override
+			public StorageMode getStorageMode() {
+				return StorageMode.MEMORY;
+			}
+			
+			@Override
+			public long getLength() {
+				return bytes.length;
+			}
 		});
     	setEncoding(Streamable.ENCODING);
 	}

Modified: trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/Streamable.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -27,7 +27,8 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.nio.charset.Charset;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.teiid.core.CorePlugin;
 
@@ -44,7 +45,7 @@
 
 	private static final long serialVersionUID = -8252488562134729374L;
 	
-	private static AtomicInteger counter = new AtomicInteger();
+	private static AtomicLong counter = new AtomicLong();
 	
 	public static final String ENCODING = "UTF-8"; //$NON-NLS-1$
 	public static final Charset CHARSET = Charset.forName(ENCODING);
@@ -66,10 +67,23 @@
     	this.reference = reference;
     }
     
+    /**
+     * Returns the cached length.  May be binary or character based.
+     * @return
+     */
     public long getLength() {
 		return length;
 	}
     
+    abstract long computeLength() throws SQLException;
+    
+    public long length() throws SQLException {
+    	if (length == -1) {
+    		length = computeLength();
+    	}
+    	return length;
+    }
+    
     public T getReference() {
 		return reference;
 	}
@@ -82,9 +96,9 @@
         return this.referenceStreamId;
     }
     
-    /*public void setReferenceStreamId(String id) {
+    public void setReferenceStreamId(String id) {
         this.referenceStreamId = id;
-    }*/
+    }
     
     @Override
     public String toString() {
@@ -99,12 +113,27 @@
     		ClassNotFoundException {
     	length = in.readLong();
     	referenceStreamId = (String)in.readObject();
+    	if (referenceStreamId == null) {
+    		//we expect the data inline
+    		readReference(in);
+    	}
     }
     
+    protected abstract void readReference(ObjectInput in) throws IOException;
+    
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
+		try {
+			length();
+		} catch (SQLException e) {
+		}
     	out.writeLong(length);
     	out.writeObject(referenceStreamId);
+    	if (referenceStreamId == null) {
+    		writeReference(out);
+    	}
     }
-
+    
+    protected abstract void writeReference(ObjectOutput out) throws IOException;
+    
 }

Modified: trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java
===================================================================
--- trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/java/org/teiid/core/types/XMLType.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -189,4 +189,29 @@
 			}
 		}
 	}
+	
+	@Override
+	long computeLength() throws SQLException {
+        if (this.reference instanceof SQLXMLImpl) {
+        	SQLXMLImpl impl = (SQLXMLImpl)this.reference;
+        	return impl.length();
+        }
+        return BaseLob.length(getBinaryStream());
+    }
+	
+	@Override
+	protected void readReference(ObjectInput in) throws IOException {
+		byte[] bytes = new byte[(int)getLength()];
+		in.readFully(bytes);
+		this.reference = new SQLXMLImpl(bytes);
+	}
+	
+	@Override
+	protected void writeReference(final ObjectOutput out) throws IOException {
+		try {
+			BlobType.writeBinary(out, getBinaryStream(), (int)length);
+		} catch (SQLException e) {
+			throw new IOException();
+		}
+	}
 }

Modified: trunk/common-core/src/main/resources/org/teiid/core/util/application.properties
===================================================================
--- trunk/common-core/src/main/resources/org/teiid/core/util/application.properties	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/main/resources/org/teiid/core/util/application.properties	2011-09-09 19:04:45 UTC (rev 3462)
@@ -1,5 +1,5 @@
-build.releaseNumber=${pom.version}
-build.number=${pom.version}
+build.releaseNumber=${project.version}
+build.number=${project.version}
 build.date=@build-date@
 copyright=Copyright (C) 2008-2009 Red Hat, Inc
 url=${site.url}
\ No newline at end of file

Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestBlobValue.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -22,21 +22,14 @@
 
 package org.teiid.core.types;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
 import javax.sql.rowset.serial.SerialBlob;
 
-import org.teiid.core.types.BlobType;
-import org.teiid.core.util.UnitTestUtil;
-
-
 import junit.framework.TestCase;
 
+import org.junit.Test;
+import org.teiid.core.util.UnitTestUtil;
 
+
 public class TestBlobValue extends TestCase {
 
     public void testBlobValue() throws Exception {
@@ -55,22 +48,27 @@
         String key = bv.getReferenceStreamId();
         
         // now force to serialize
-        File saved = new File(UnitTestUtil.getTestScratchPath()+"/blobassaved.bin"); //$NON-NLS-1$
-        ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(saved));
-        out.writeObject(bv);
-        out.close();
-        
-        // now read back the object from serilized state
-        ObjectInputStream in = new ObjectInputStream(new FileInputStream(saved));
-        BlobType read = (BlobType)in.readObject();
+        BlobType read = UnitTestUtil.helpSerialize(bv);
                 
         // make sure we have kept the reference stream id
         assertEquals(key, read.getReferenceStreamId());
         
         // and lost the original object
         assertNull(read.getReference());
+    }
+    
+    @Test public void testReferencePersistence() throws Exception {
+    	String testString = "this is test clob"; //$NON-NLS-1$
+        SerialBlob blob = new SerialBlob(testString.getBytes());
         
-        saved.delete();
+        BlobType bv = new BlobType(blob);
+        bv.setReferenceStreamId(null);
+        // now force to serialize
+        BlobType read = UnitTestUtil.helpSerialize(bv);
+                
+        assertNull(read.getReferenceStreamId());
+        
+        assertEquals(testString, new String(read.getBytes(1, (int)blob.length())));
     }
     
 }

Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestClobValue.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -24,12 +24,7 @@
 
 import static org.junit.Assert.*;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Reader;
 
 import javax.sql.rowset.serial.SerialClob;
@@ -56,15 +51,8 @@
         String key = cv.getReferenceStreamId();
         
         // now force to serialize
-        File saved = new File(UnitTestUtil.getTestScratchPath()+"/clobassaved.bin"); //$NON-NLS-1$
-        ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(saved));
-        out.writeObject(cv);
-        out.close();
+        ClobType read = UnitTestUtil.helpSerialize(cv);
         
-        // now read back the object from serilized state
-        ObjectInputStream in = new ObjectInputStream(new FileInputStream(saved));
-        ClobType read = (ClobType)in.readObject();
-        
         assertTrue(read.length() > 0);
                 
         // make sure we have kept the reference stream id
@@ -72,8 +60,21 @@
         
         // and lost the original object
         assertNull(read.getReference());
+    }
+    
+    @Test public void testReferencePersistence() throws Exception {
+    	String testString = "this is test clob"; //$NON-NLS-1$
+        SerialClob clob = new SerialClob(testString.toCharArray());
         
-        saved.delete();
+        ClobType cv = new ClobType(clob);
+        cv.setReferenceStreamId(null);
+        
+        // now force to serialize
+        ClobType read = UnitTestUtil.helpSerialize(cv);
+        
+        assertTrue(read.length() > 0);
+                
+        assertEquals(testString, read.getSubString(1, testString.length()));
     }
     
     @Test public void testClobSubstring() throws Exception {

Modified: trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/common-core/src/test/java/org/teiid/core/types/TestXMLValue.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -22,20 +22,11 @@
 
 package org.teiid.core.types;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import junit.framework.TestCase;
 
-import org.teiid.core.types.SQLXMLImpl;
-import org.teiid.core.types.XMLType;
 import org.teiid.core.util.UnitTestUtil;
 
 
-import junit.framework.TestCase;
-
-
 public class TestXMLValue extends TestCase {
 
     public void testXMLValue() throws Exception {
@@ -55,22 +46,26 @@
         String key = xv.getReferenceStreamId();
         
         // now force to serialize
-        File saved = new File(UnitTestUtil.getTestScratchPath()+"/xmlsaved.bin"); //$NON-NLS-1$
-        ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(saved));
-        out.writeObject(xv);
-        out.close();
-        
-        // now read back the object from serilized state
-        ObjectInputStream in = new ObjectInputStream(new FileInputStream(saved));
-        XMLType read = (XMLType)in.readObject();
+        XMLType read = UnitTestUtil.helpSerialize(xv);
                 
         // make sure we have kept the reference stream id
         assertEquals(key, read.getReferenceStreamId());
         
         // and lost the original object
         assertNull(read.getReference());
+    }
+    
+    public void testReferencePersistence() throws Exception {
+        String testString = "<foo>this is an xml value test</foo>"; //$NON-NLS-1$
+        SQLXMLImpl xml = new SQLXMLImpl(testString); 
         
-        saved.delete();
+        XMLType xv = new XMLType(xml);
+        xv.setReferenceStreamId(null);
+        
+        // now force to serialize
+        XMLType read = UnitTestUtil.helpSerialize(xv);
+                
+        assertEquals(testString, read.getString());
     }
     
 }

Modified: trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml
===================================================================
--- trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/documentation/caching-guide/src/main/docbook/en-US/content/hint-option.xml	2011-09-09 19:04:45 UTC (rev 3462)
@@ -19,6 +19,9 @@
 	   	<itemizedlist>
 	   		<listitem><para>The cache hint should appear at the beginning of the SQL.  It will not have any affect on INSERT/UPDATE/DELETE statements or virtual update procedure definitions.</para></listitem>
 	   		<listitem><para><emphasis>pref_mem</emphasis> - if present indicates that the cached results should prefer to remain in memory.  They are not however required to be memory only.
+	   		<note><para>Care should be taken to not over use the pref_mem option.  
+	   		The memory preference is implemented with Java soft references.  While soft references are effective at preventing out of memory conditions.  
+	   		Too much memory held by soft references can limit the effective working memory.  Consult your JVM options for clearing soft references if you need to tune their behavior.</para></note>
 	   		</para></listitem>
 	   		<listitem><para><emphasis>ttl:n</emphasis> - if present n indicates the time to live value in milliseconds.
 	   		</para></listitem>

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -56,6 +56,9 @@
 
 	@Override
 	public long getLength() {
+		if (fsos != null && !fsos.bytesWritten()) {
+			return fsos.getCount();
+		}
 		return lobBuffer.getLength();
 	}
 
@@ -86,7 +89,10 @@
 	}
 	
 	@Override
-	public boolean isPersistent() {
-		return fsos == null || fsos.bytesWritten();
+	public StorageMode getStorageMode() {
+		if (fsos == null) {
+			return StorageMode.PERSISTENT;
+		}
+		return StorageMode.MEMORY;
 	}
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/LobManager.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -45,6 +45,10 @@
 import org.teiid.core.types.SQLXMLImpl;
 import org.teiid.core.types.Streamable;
 import org.teiid.core.types.XMLType;
+import org.teiid.core.types.InputStreamFactory.BlobInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.ClobInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.SQLXMLInputStreamFactory;
+import org.teiid.core.types.InputStreamFactory.StorageMode;
 import org.teiid.core.util.ObjectConverterUtil;
 import org.teiid.query.QueryPlugin;
 import org.teiid.query.sql.symbol.Expression;
@@ -55,6 +59,16 @@
  */
 public class LobManager {
 	private Map<String, Streamable<?>> lobReferences = new ConcurrentHashMap<String, Streamable<?>>();
+	private boolean inlineLobs = true;
+	private int maxMemoryBytes = DataTypeManager.MAX_LOB_MEMORY_BYTES;
+	
+	public void setInlineLobs(boolean trackMemoryLobs) {
+		this.inlineLobs = trackMemoryLobs;
+	}
+	
+	public void setMaxMemoryBytes(int maxMemoryBytes) {
+		this.maxMemoryBytes = maxMemoryBytes;
+	}
 
 	public void updateReferences(int[] lobIndexes, List<?> tuple)
 			throws TeiidComponentException {
@@ -64,6 +78,16 @@
 				continue;
 			}
 			Streamable lob = (Streamable) anObj;
+			try {
+				if (inlineLobs 
+						&& (InputStreamFactory.getStorageMode(lob) == StorageMode.MEMORY
+						|| lob.length()*(lob instanceof ClobType?2:1) <= maxMemoryBytes)) {
+					lob.setReferenceStreamId(null);
+					continue;
+				}
+			} catch (SQLException e) {
+				//presumably the lob is bad, but let it slide for now
+			}
 			if (lob.getReference() == null) {
 				lob.setReference(getLobReference(lob.getReferenceStreamId()).getReference());
 			} else {
@@ -81,14 +105,14 @@
     	return lob;
     }
         
-    public static int[] getLobIndexes(List expressions) {
+    public static int[] getLobIndexes(List<? extends Expression> expressions) {
     	if (expressions == null) {
     		return null;
     	}
 		int[] result = new int[expressions.size()];
 		int resultIndex = 0;
 	    for (int i = 0; i < expressions.size(); i++) {
-	    	Expression expr = (Expression) expressions.get(i);
+	    	Expression expr = expressions.get(i);
 	        if (DataTypeManager.isLOB(expr.getType()) || expr.getType() == DataTypeManager.DefaultDataClasses.OBJECT) {
 	        	result[resultIndex++] = i;
 	        }
@@ -115,7 +139,7 @@
 			try {
 				BaseLob baseLob = (BaseLob)lob.getReference();
 				InputStreamFactory isf = baseLob.getStreamFactory();
-				if (isf.isPersistent()) {
+				if (isf.getStorageMode() == StorageMode.PERSISTENT) {
 					return lob;
 				}
 			} catch (SQLException e) {
@@ -127,19 +151,15 @@
 		Streamable<?> persistedLob;
 					
 		try {
-			InputStreamFactory isf = new InputStreamFactory() {
-				@Override
-				public InputStream getInputStream() throws IOException {
-			    	if (lob instanceof BlobType) {
-			    		return new BlobInputStreamFactory((Blob)lob).getInputStream();
-			    	}
-			    	else if (lob instanceof ClobType) {
-			    		return new ClobInputStreamFactory((Clob)lob).getInputStream();
-			    	}
-			    	return new SQLXMLInputStreamFactory((SQLXML)lob).getInputStream();
-				}					
-			};
-			InputStream is = isf.getInputStream();
+			InputStream is = null;
+	    	if (lob instanceof BlobType) {
+	    		is = new BlobInputStreamFactory((Blob)lob).getInputStream();
+	    	}
+	    	else if (lob instanceof ClobType) {
+	    		is = new ClobInputStreamFactory((Clob)lob).getInputStream();
+	    	} else {
+	    		is = new SQLXMLInputStreamFactory((SQLXML)lob).getInputStream();
+	    	}
 			OutputStream fsos = store.createOutputStream();
 			length = ObjectConverterUtil.write(fsos, is, bytes, -1);
 		} catch (IOException e) {
@@ -156,8 +176,8 @@
 			}
 			
 			@Override
-			public boolean isPersistent() {
-				return true;
+			public StorageMode getStorageMode() {
+				return StorageMode.PERSISTENT;
 			}
 		};			
 		
@@ -178,4 +198,8 @@
 		}		
 		return persistedLob;		
 	}
+	
+	public int getLobCount() {
+		return this.lobReferences.size();
+	}
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/TupleBuffer.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -94,6 +94,12 @@
 		this.batchSize = batchSize;		
 	}
 	
+	public void setInlineLobs(boolean inline) {
+		if (this.lobManager != null) {
+			this.lobManager.setInlineLobs(inline);
+		}
+	}
+	
 	public String getId() {
 		if (this.uuid == null) {
 			this.uuid = java.util.UUID.randomUUID().toString();
@@ -373,4 +379,11 @@
 		return types;
 	}
 	
+	public int getLobCount() {
+		if (this.lobManager == null) {
+			return 0;
+		}
+		return this.lobManager.getLobCount();
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -94,12 +94,12 @@
 	
 	private final class CleanupHook implements org.teiid.common.buffer.BatchManager.CleanupHook {
 		
-		private long id;
+		private Long id;
 		private WeakReference<BatchManagerImpl> ref;
 		
-		CleanupHook(long id, BatchManagerImpl batchManager) {
+		CleanupHook(Long id, WeakReference<BatchManagerImpl> batchManager) {
 			this.id = id;
-			this.ref = new WeakReference<BatchManagerImpl>(batchManager);
+			this.ref = batchManager;
 		}
 		
 		public void cleanup() {
@@ -120,6 +120,7 @@
 		private AtomicLong unusedSpace = new AtomicLong();
 		private int[] lobIndexes;
 		private SizeUtility sizeUtility;
+		private WeakReference<BatchManagerImpl> ref = new WeakReference<BatchManagerImpl>(this);
 
 		private BatchManagerImpl(String newID, int[] lobIndexes) {
 			this.id = newID;
@@ -136,7 +137,7 @@
 		@Override
 		public ManagedBatch createManagedBatch(TupleBatch batch, boolean softCache)
 				throws TeiidComponentException {
-			ManagedBatchImpl mbi = new ManagedBatchImpl(batch, this, softCache);
+			ManagedBatchImpl mbi = new ManagedBatchImpl(batch, ref, softCache);
 			mbi.addToCache(false);
 			persistBatchReferences();
 			return mbi;
@@ -196,6 +197,11 @@
 		public void remove() {
 			this.store.remove();
 		}
+		
+		@Override
+		public String toString() {
+			return id;
+		}
 	}
 
 	/**
@@ -205,10 +211,18 @@
 		TreeMap<Long, ManagedBatchImpl> batches = new TreeMap<Long, ManagedBatchImpl>();
 		Long lastUsed = null;
 		
-		ManagedBatchImpl removeBatch(long row) {
+		ManagedBatchImpl removeBatch(Long row) {
 			ManagedBatchImpl result = batches.remove(row);
 			if (result != null) {
 				activeBatchKB -= result.sizeEstimate;
+				if (result.softCache) {
+					BatchSoftReference ref = (BatchSoftReference)result.batchReference;
+					if (ref != null) {
+						maxReserveKB += ref.sizeEstimate;
+						ref.sizeEstimate = 0;
+						ref.clear();
+					}
+				}
 			}
 			return result;
 		}
@@ -220,21 +234,22 @@
 		private volatile TupleBatch activeBatch;
 		private volatile Reference<TupleBatch> batchReference;
 		private int beginRow;
-		private BatchManagerImpl batchManager;
-		private long id;
+		private WeakReference<BatchManagerImpl> managerRef;
+		private Long id;
 		private LobManager lobManager;
 		private int sizeEstimate;
 		
-		public ManagedBatchImpl(TupleBatch batch, BatchManagerImpl manager, boolean softCache) {
+		public ManagedBatchImpl(TupleBatch batch, WeakReference<BatchManagerImpl> ref, boolean softCache) {
 			this.softCache = softCache;
 			id = batchAdded.incrementAndGet();
 			this.activeBatch = batch;
 			this.beginRow = batch.getBeginRow();
-			this.batchManager = manager;
-			if (this.batchManager.lobIndexes != null) {
+			this.managerRef = ref;
+			BatchManagerImpl batchManager = ref.get();
+			if (batchManager.lobIndexes != null) {
 				this.lobManager = new LobManager();
 			}
-			sizeEstimate = (int) Math.max(1, manager.sizeUtility.getBatchSize(batch) / 1024);
+			sizeEstimate = (int) Math.max(1, batchManager.sizeUtility.getBatchSize(batch) / 1024);
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", id, "with size estimate", sizeEstimate); //$NON-NLS-1$ //$NON-NLS-2$
 			}
@@ -247,6 +262,11 @@
 		}
 
 		private void addToCache(boolean update) {
+			BatchManagerImpl batchManager = managerRef.get();
+			if (batchManager == null) {
+				remove();
+				return;
+			}
 			synchronized (activeBatches) {
 				TupleBatch batch = this.activeBatch;
 				if (batch == null) {
@@ -272,11 +292,25 @@
 
 		@Override
 		public TupleBatch getBatch(boolean cache, String[] types) throws TeiidComponentException {
+			BatchManagerImpl batchManager = managerRef.get();
+			if (batchManager == null) {
+				remove();
+				throw new AssertionError("Already removed"); //$NON-NLS-1$
+			}
 			long reads = readAttempts.incrementAndGet();
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, batchManager.id, "getting batch", reads, "reference hits", referenceHit.get()); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 			synchronized (activeBatches) {
+				for (int i = 0; i < 10; i++) {
+					BatchSoftReference ref = (BatchSoftReference)SOFT_QUEUE.poll();
+					if (ref == null) {
+						break;
+					}
+					maxReserveKB += ref.sizeEstimate;
+					ref.sizeEstimate = 0;
+					ref.clear();
+				}
 				TupleBufferInfo tbi = activeBatches.remove(batchManager.id);
 				if (tbi != null) { 
 					boolean put = true;
@@ -316,7 +350,7 @@
 					LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, batchManager.id, id, "reading batch from disk, total reads:", count); //$NON-NLS-1$
 				}
 				try {
-					this.batchManager.compactionLock.readLock().lock();
+					batchManager.compactionLock.readLock().lock();
 					long[] info = batchManager.physicalMapping.get(this.id);
 					Assertion.isNotNull(info, "Invalid batch " + id); //$NON-NLS-1$
 					ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(batchManager.store.createInputStream(info[0]), IO_BUFFER_SIZE));
@@ -340,12 +374,17 @@
 		        } catch (ClassNotFoundException e) {
 		        	throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", batchManager.id)); //$NON-NLS-1$
 		        } finally {
-		        	this.batchManager.compactionLock.readLock().unlock();
+		        	batchManager.compactionLock.readLock().unlock();
 		        }
 			}
 		}
 
 		public synchronized void persist() throws TeiidComponentException {
+			BatchManagerImpl batchManager = managerRef.get();
+			if (batchManager == null) {
+				remove();
+				return;
+			}
 			boolean lockheld = false;
             try {
 				TupleBatch batch = activeBatch;
@@ -376,7 +415,10 @@
 						}
 					}
 					if (softCache) {
-						this.batchReference = new SoftReference<TupleBatch>(batch);
+						this.batchReference = new BatchSoftReference(batch, SOFT_QUEUE, sizeEstimate);
+						synchronized (activeBatches) {
+							maxReserveKB -= sizeEstimate;
+						}
 					} else if (useWeakReferences) {
 						this.batchReference = new WeakReference<TupleBatch>(batch);
 					}
@@ -389,26 +431,39 @@
 				persistent = true;
 				activeBatch = null;
 				if (lockheld) {
-					this.batchManager.compactionLock.writeLock().unlock();
+					batchManager.compactionLock.writeLock().unlock();
 				}
 			}
 		}
 
 		public void remove() {
-			cleanupManagedBatch(batchManager, id);
+			cleanupManagedBatch(managerRef.get(), id);
 		}
 				
 		@Override
 		public CleanupHook getCleanupHook() {
-			return new CleanupHook(id, batchManager);
+			return new CleanupHook(id, managerRef);
 		}
 		
 		@Override
 		public String toString() {
-			return "ManagedBatch " + batchManager.id + " " + this.id + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+			return "ManagedBatch " + managerRef.get() + " " + this.id + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
 		}
 	}
 	
+	private static class BatchSoftReference extends SoftReference<TupleBatch> {
+
+		private int sizeEstimate;
+		
+		public BatchSoftReference(TupleBatch referent,
+				ReferenceQueue<? super TupleBatch> q, int sizeEstimate) {
+			super(referent, q);
+			this.sizeEstimate = sizeEstimate;
+		}
+	}
+	
+	private static ReferenceQueue<? super TupleBatch> SOFT_QUEUE = new ReferenceQueue<TupleBatch>();
+	
 	// Configuration 
     private int connectorBatchSize = BufferManager.DEFAULT_CONNECTOR_BATCH_SIZE;
     private int processorBatchSize = BufferManager.DEFAULT_PROCESSOR_BATCH_SIZE;
@@ -419,6 +474,7 @@
     private volatile int reserveBatchKB;
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
     private boolean useWeakReferences = true;
+    private boolean inlineLobs = true;
 
     private ReentrantLock lock = new ReentrantLock(true);
     private Condition batchesFreed = lock.newCondition();
@@ -508,10 +564,11 @@
         if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
         	LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating TupleBuffer:", newID, elements, Arrays.toString(tupleBuffer.getTypes()), "of type", tupleSourceType); //$NON-NLS-1$ //$NON-NLS-2$
         }
+    	tupleBuffer.setInlineLobs(inlineLobs);
         return tupleBuffer;
     }
     
-    private void cleanupManagedBatch(BatchManagerImpl batchManager, long id) {
+    private void cleanupManagedBatch(BatchManagerImpl batchManager, Long id) {
 		synchronized (activeBatches) {
 			TupleBufferInfo tbi = activeBatches.get(batchManager.id);
 			if (tbi != null && tbi.removeBatch(id) != null) {
@@ -730,4 +787,8 @@
 		this.useWeakReferences = useWeakReferences;
 	}
 	
+	public void setInlineLobs(boolean inlineLobs) {
+		this.inlineLobs = inlineLobs;
+	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/SizeUtility.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -112,9 +112,8 @@
 		} else if (type == DataTypeManager.DefaultDataClasses.BIG_DECIMAL) {
 		 	return isValueCacheEnabled?150:200;
 		}
-		return 512; //assumes buffer overhead in the case of lobs
-		//however the account for lobs is misleading as the lob
-		//references are not actually removed from memory
+		return 512; //this is is misleading for lobs
+		//most references are not actually removed from memory
 	}
     
     /**

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -872,6 +872,12 @@
 
 	private void doneProducingBatches() {
 		this.doneProducingBatches = true;
+		//TODO: we could perform more tracking to know what source lobs are in use
+		if (this.resultsBuffer.getLobCount() == 0) {
+			for (DataTierTupleSource connectorRequest : this.connectorInfo.values()) {
+				connectorRequest.fullyCloseSource();
+		    }
+		}
 		dqpCore.finishProcessing(this);
 	}
 	

Modified: trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/main/java/org/teiid/query/metadata/TransformationMetadata.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -125,8 +125,8 @@
 		}
 		
 		@Override
-		public void free() throws IOException {
-			f.close();
+		public StorageMode getStorageMode() {
+			return StorageMode.PERSISTENT;
 		}
 	}
 

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestLobManager.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -66,6 +66,7 @@
 		}));		
 		
 		LobManager lobManager = new LobManager();
+		lobManager.setMaxMemoryBytes(4);
 		lobManager.updateReferences(new int[] {0,1}, Arrays.asList(clob, blob));
 		lobManager.persist(fs);
 		

Modified: trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/TestTupleBuffer.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -107,6 +107,7 @@
 		x.setType(DataTypeManager.DefaultDataClasses.CLOB);
 		List<ElementSymbol> schema = Arrays.asList(x);
 		TupleBuffer tb = new TupleBuffer(new FakeBatchManager(), "x", schema, LobManager.getLobIndexes(schema), 32); //$NON-NLS-1$
+		tb.setInlineLobs(false);
 		ClobType c = new ClobType(new SerialClob(new char[0]));
 		TupleBatch batch = new TupleBatch(1, new List[] {Arrays.asList(c)});
 		tb.addTupleBatch(batch, false);

Modified: trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -120,7 +120,9 @@
 			
 			@Override
 			public BufferManager getBufferManager() {
-				return BufferManagerFactory.createBufferManager();
+				BufferManagerImpl bm = BufferManagerFactory.createBufferManager();
+				bm.setInlineLobs(false);
+				return bm;
 			}
 		});
         core.setCacheFactory(new DefaultCacheFactory());

Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-09 17:10:27 UTC (rev 3461)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-09-09 19:04:45 UTC (rev 3462)
@@ -65,6 +65,7 @@
     private int maxProcessingKb = BufferManager.DEFAULT_MAX_PROCESSING_KB;
     private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
     private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
+    private boolean inlineLobs = true;
 	private FileStorageManager fsm;
 	
     /**
@@ -144,6 +145,10 @@
 	public void setConnectorBatchSize(int size) {
 		this.connectorBatchSize = size;
 	}
+	
+	public void setInlineLobs(boolean inlineLobs) {
+		this.inlineLobs = inlineLobs;
+	}
 
 	public File getBufferDirectory() {
 		return bufferDir;
@@ -152,6 +157,10 @@
 	public boolean isUseDisk() {
 		return this.useDisk;
 	}
+	
+	public boolean isInlineLobs() {
+		return inlineLobs;
+	}
 
 	@ManagementProperty(description="The max row count of a batch sent internally within the query processor. Should be <= the connectorBatchSize. (default 256)")
 	public int getProcessorBatchSize() {



More information about the teiid-commits mailing list