[teiid-commits] teiid SVN: r1235 - in trunk: common-core/src/main/java/com/metamatrix/common/types and 20 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Aug 12 13:39:16 EDT 2009


Author: shawkins
Date: 2009-08-12 13:39:14 -0400 (Wed, 12 Aug 2009)
New Revision: 1235

Added:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java
Removed:
   trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
   trunk/engine/src/main/java/com/metamatrix/common/lob/
Modified:
   trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java
   trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java
   trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java
   trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java
   trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties
   trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java
   trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java
   trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
   trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java
   trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java
   trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
   trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
   trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
   trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
Log:
TEIID-767 TEIID-771 simplifying the lob handling of buffermanager and merging in the memorystate logic.  reviewed by RR.

Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMBlob.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -55,7 +55,7 @@
      */
     public static Blob newInstance(StreamingLobChunckProducer.Factory lobChunckFactory, BlobType blob) {
     	if (!Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
-        	Blob sourceBlob = blob.getSourceBlob();
+        	Blob sourceBlob = blob.getReference();
         	if (sourceBlob != null) {
         		return sourceBlob;
         	}

Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMClob.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -72,7 +72,7 @@
 	
     public static Clob newInstance(StreamingLobChunckProducer.Factory lobChunckFactory, ClobType clob) throws SQLException {
     	if (!Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
-            Clob sourceClob = clob.getSourceClob();
+            Clob sourceClob = clob.getReference();
             if (sourceClob != null) {
             	return sourceClob;
             }

Modified: trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java
===================================================================
--- trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/client-jdbc/src/main/java/com/metamatrix/jdbc/MMSQLXML.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -59,7 +59,7 @@
     
     public static SQLXML newInstance(StreamingLobChunckProducer.Factory lobChunckFactory, XMLType srcXML) throws SQLException {
     	if (Boolean.getBoolean(Streamable.FORCE_STREAMING)) {
-    		SQLXML sourceSQLXML = srcXML.getSourceSQLXML();
+    		SQLXML sourceSQLXML = srcXML.getReference();
         	if (sourceSQLXML != null) {
         		return sourceSQLXML;
         	}

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/BlobType.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -30,20 +30,15 @@
 import java.sql.SQLException;
 
 import javax.sql.rowset.serial.SerialBlob;
-import javax.sql.rowset.serial.SerialException;
 
-import com.metamatrix.core.CorePlugin;
 import com.metamatrix.core.MetaMatrixRuntimeException;
-import com.metamatrix.core.util.ArgCheck;
 
 /**
  * Represent a value of type "blob", which can be streamable from client
  */
-public final class BlobType implements Streamable, Blob {
+public final class BlobType extends Streamable<Blob> implements Blob {
 
-	private transient Blob srcBlob;
-	private String streamId;
-	private String persistentId;
+	private static final long serialVersionUID = 1294191629070433450L;
 	private long length = -1;
     
     /**
@@ -54,8 +49,7 @@
     }
 
     public BlobType(Blob blob) {
-    	ArgCheck.isNotNull(blob);
-        this.srcBlob = blob;
+    	super(blob);
         try {
             this.length = blob.length();
         } catch (SQLException e) {
@@ -63,52 +57,18 @@
         }
     }
     
-    public Blob getSourceBlob() {
-    	return srcBlob;
-    }
-    
     /** 
-     * @see com.metamatrix.common.types.Streamable#getReferenceStreamId()
-     */
-    public String getReferenceStreamId() {
-        return this.streamId;
-    }
-    
-    /** 
-     * @see com.metamatrix.common.types.Streamable#setReferenceStreamId(java.lang.String)
-     */
-    public void setReferenceStreamId(String id) {
-        this.streamId = id;
-    }
-    
-    /** 
-     * @see com.metamatrix.common.types.Streamable#getPersistenceStreamId()
-     */
-    public String getPersistenceStreamId() {
-        return persistentId;
-    }
-
-    /** 
-     * @see com.metamatrix.common.types.Streamable#setPersistenceStreamId(java.lang.String)
-     */
-    public void setPersistenceStreamId(String id) {
-        this.persistentId = id;
-    }    
-    
-    /** 
      * @see java.sql.Blob#getBinaryStream()
      */
     public InputStream getBinaryStream() throws SQLException {
-        checkReference();
-        return this.srcBlob.getBinaryStream();
+        return this.reference.getBinaryStream();
     }
 
     /** 
      * @see java.sql.Blob#getBytes(long, int)
      */
     public byte[] getBytes(long pos, int length) throws SQLException {
-        checkReference();
-        return this.srcBlob.getBytes(pos, length);
+        return this.reference.getBytes(pos, length);
     }
 
     /** 
@@ -121,32 +81,28 @@
         }
         
         // if did not find before then do it again.
-        checkReference();
-        return this.srcBlob.length();
+        return this.reference.length();
     }
 
     /** 
      * @see java.sql.Blob#position(java.sql.Blob, long)
      */
     public long position(Blob pattern, long start) throws SQLException {
-        checkReference();
-        return this.srcBlob.position(pattern, start);
+        return this.reference.position(pattern, start);
     }
 
     /** 
      * @see java.sql.Blob#position(byte[], long)
      */
     public long position(byte[] pattern, long start) throws SQLException {
-        checkReference();
-        return this.srcBlob.position(pattern, start);
+        return this.reference.position(pattern, start);
     }
 
     /** 
      * @see java.sql.Blob#setBinaryStream(long)
      */
     public OutputStream setBinaryStream(long pos) throws SQLException {
-        checkReference();
-        return this.srcBlob.setBinaryStream(pos);
+        return this.reference.setBinaryStream(pos);
     }
 
     /** 
@@ -157,63 +113,23 @@
                         byte[] bytes,
                         int offset,
                         int len) throws SQLException {
-        checkReference();
-        return this.srcBlob.setBytes(pos, bytes, offset, len);
+        return this.reference.setBytes(pos, bytes, offset, len);
     }
 
     /** 
      * @see java.sql.Blob#setBytes(long, byte[])
      */
     public int setBytes(long pos, byte[] bytes) throws SQLException {
-        checkReference();
-        return this.srcBlob.setBytes(pos, bytes);
+        return this.reference.setBytes(pos, bytes);
     }
 
     /** 
      * @see java.sql.Blob#truncate(long)
      */
     public void truncate(long len) throws SQLException {
-        checkReference();
-        this.srcBlob.truncate(len);
+        this.reference.truncate(len);
     }
     
-    /** 
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    public boolean equals(Object o) {
-    	if (this == o) {
-    		return true;
-    	}
-    	
-    	if (!(o instanceof BlobType)) {
-    		return false;
-    	}
-    	
-    	BlobType other = (BlobType)o;
-    	
-    	if (this.srcBlob != null) {
-    		return this.srcBlob.equals(other.srcBlob);
-    	}
-    	
-    	return this.persistentId == other.persistentId
-				&& this.streamId == other.streamId;
-
-    }
-
-    /** 
-     * @see java.lang.Object#toString()
-     */
-    public String toString() {
-        checkReference();
-        return srcBlob.toString();
-    }    
-        
-    private void checkReference() {
-        if (this.srcBlob == null) {
-            throw new InvalidReferenceException(CorePlugin.Util.getString("BlobValue.InvalidReference")); //$NON-NLS-1$
-        }
-    }
-    
     /**
      * Utility Method to convert blob into byte array  
      * @param blob
@@ -234,14 +150,12 @@
     }
     //## JDBC4.0-begin ##
 	public void free() throws SQLException {
-		checkReference();
-		this.srcBlob.free();
+		this.reference.free();
 	}
 
 	public InputStream getBinaryStream(long pos, long length)
 			throws SQLException {
-		checkReference();
-		return this.srcBlob.getBinaryStream(pos, length);
+		return this.reference.getBinaryStream(pos, length);
 	}
 	//## JDBC4.0-end ##
 	

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/ClobType.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -31,41 +31,27 @@
 import java.sql.Clob;
 import java.sql.SQLException;
 
-import javax.sql.rowset.serial.SerialBlob;
 import javax.sql.rowset.serial.SerialClob;
-import javax.sql.rowset.serial.SerialException;
 
-import com.metamatrix.core.CorePlugin;
 import com.metamatrix.core.MetaMatrixRuntimeException;
 
 /**
  * This is wrapper on top of a "clob" object, which implements the "java.sql.Clob"
  * interface. This class also implements the Streamable interface
  */
-public final class ClobType implements Streamable, Clob, Sequencable {
+public final class ClobType extends Streamable<Clob> implements Clob, Sequencable {
 
-	private transient Clob srcClob;    
-	private String streamId;
-	private String persistentId;
+	private static final long serialVersionUID = 2753412502127824104L;
 	private long length = -1;
     
     /**
      * Can't construct
      */
     ClobType() {
-        super();
     }
     
-    public Clob getSourceClob() {
-    	return this.srcClob;
-    }
-    
     public ClobType(Clob clob) {
-        if (clob == null) {
-            throw new IllegalArgumentException(CorePlugin.Util.getString("ClobValue.isNUll")); //$NON-NLS-1$
-        }
-        // this will serve as the in VM reference
-        this.srcClob = clob;
+    	super(clob);
         
         try {
             this.length = clob.length();
@@ -73,57 +59,26 @@
             // ignore.
         }
     }
-        
+            
     /** 
-     * @see com.metamatrix.common.types.Streamable#getReferenceStreamId()
-     */
-    public String getReferenceStreamId() {
-        return this.streamId;
-    }
-    
-    /** 
-     * @see com.metamatrix.common.types.Streamable#setReferenceStreamId(java.lang.String)
-     */
-    public void setReferenceStreamId(String id) {
-        this.streamId = id;
-    }
-    
-    /** 
-     * @see com.metamatrix.common.types.Streamable#getPersistenceStreamId()
-     */
-    public String getPersistenceStreamId() {
-        return persistentId;
-    }
-
-    /** 
-     * @see com.metamatrix.common.types.Streamable#setPersistenceStreamId(java.lang.String)
-     */
-    public void setPersistenceStreamId(String id) {
-        this.persistentId = id;
-    }     
-    
-    /** 
      * @see java.sql.Clob#getAsciiStream()
      */
     public InputStream getAsciiStream() throws SQLException {
-        checkReference();
-        return this.srcClob.getAsciiStream();
+        return this.reference.getAsciiStream();
     }
 
     /** 
      * @see java.sql.Clob#getCharacterStream()
      */
     public Reader getCharacterStream() throws SQLException {
-        checkReference();
-        return this.srcClob.getCharacterStream();
+        return this.reference.getCharacterStream();
     }
 
     /** 
      * @see java.sql.Clob#getSubString(long, int)
      */
     public String getSubString(long pos, int length) throws SQLException {
-        checkReference();
-        return this.srcClob.getSubString(pos, length);
+        return this.reference.getSubString(pos, length);
     }
 
     /** 
@@ -134,40 +89,35 @@
             return this.length;
         }
         
-        checkReference();
-        return this.srcClob.length();
+        return this.reference.length();
     }
 
     /** 
      * @see java.sql.Clob#position(java.sql.Clob, long)
      */
     public long position(Clob searchstr, long start) throws SQLException {
-        checkReference();
-        return this.srcClob.position(searchstr, start);
+        return this.reference.position(searchstr, start);
     }
 
     /** 
      * @see java.sql.Clob#position(java.lang.String, long)
      */
     public long position(String searchstr, long start) throws SQLException {
-        checkReference();
-        return this.srcClob.position(searchstr, start);
+        return this.reference.position(searchstr, start);
     }
 
     /** 
      * @see java.sql.Clob#setAsciiStream(long)
      */
     public OutputStream setAsciiStream(long pos) throws SQLException {
-        checkReference();
-        return this.srcClob.setAsciiStream(pos);
+        return this.reference.setAsciiStream(pos);
     }
 
     /** 
      * @see java.sql.Clob#setCharacterStream(long)
      */
     public Writer setCharacterStream(long pos) throws SQLException {
-        checkReference();
-        return this.srcClob.setCharacterStream(pos);
+        return this.reference.setCharacterStream(pos);
     }
 
     /** 
@@ -177,62 +127,23 @@
                          String str,
                          int offset,
                          int len) throws SQLException {
-        checkReference();
-        return this.srcClob.setString(pos, str, offset, len);
+        return this.reference.setString(pos, str, offset, len);
     }
 
     /** 
      * @see java.sql.Clob#setString(long, java.lang.String)
      */
     public int setString(long pos, String str) throws SQLException {
-        checkReference();
-        return this.srcClob.setString(pos, str);
+        return this.reference.setString(pos, str);
     }
 
     /** 
      * @see java.sql.Clob#truncate(long)
      */
     public void truncate(long len) throws SQLException {
-        checkReference();
-        this.srcClob.truncate(len);
+        this.reference.truncate(len);
     }    
 
-    /** 
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    public boolean equals(Object o) {
-    	if (this == o) {
-    		return true;
-    	}
-    	
-    	if (!(o instanceof ClobType)) {
-    		return false;
-    	}
-    	
-    	ClobType other = (ClobType)o;
-    	
-    	if (this.srcClob != null) {
-    		return this.srcClob.equals(other.srcClob);
-    	}
-    	
-    	return this.persistentId == other.persistentId
-				&& this.streamId == other.streamId;
-    }
-
-    /** 
-     * @see java.lang.Object#toString()
-     */
-    public String toString() {
-        checkReference();
-        return srcClob.toString();
-    }
-            
-    private void checkReference() {
-        if (this.srcClob == null) {
-            throw new InvalidReferenceException(CorePlugin.Util.getString("ClobValue.InvalidReference")); //$NON-NLS-1$
-        }
-    }     
-    
     /**
      * Utility method to convert to String  
      * @param clob
@@ -255,7 +166,6 @@
     private final static int CHAR_SEQUENCE_BUFFER_SIZE = 2 << 12;
     
     public CharSequence getCharSequence() {
-        checkReference();
         return new CharSequence() {
 
         	private String buffer;
@@ -299,13 +209,11 @@
     }
     //## JDBC4.0-begin ##
 	public void free() throws SQLException {
-		checkReference();
-		this.srcClob.free();
+		this.reference.free();
 	}
 
 	public Reader getCharacterStream(long pos, long length) throws SQLException {
-		checkReference();
-		return this.srcClob.getCharacterStream(pos, length);
+		return this.reference.getCharacterStream(pos, length);
 	}
 	//## JDBC4.0-end ##
 	

Deleted: trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/InvalidReferenceException.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -1,34 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.types;
-
-
-/** 
- * A exception class to define that a invalid reference has been accessed.
- */
-public class InvalidReferenceException extends RuntimeException {
-    public InvalidReferenceException() {}
-    public InvalidReferenceException(String msg){
-        super(msg);
-    }
-}

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/Streamable.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -23,8 +23,11 @@
 package com.metamatrix.common.types;
 
 import java.io.Serializable;
+import java.lang.ref.PhantomReference;
 
+import com.metamatrix.core.CorePlugin;
 
+
 /**
  * A large value object which can be streamable in chunks of data each time
  * 
@@ -38,31 +41,71 @@
  * the process worker to in case the reference object has lost its state and we 
  * need to reinsate the object from the disk.
  */
-public interface Streamable extends Serializable {
-    static final String FORCE_STREAMING = "FORCE_STREAMING"; //$NON-NLS-1$
+public abstract class Streamable<T> implements Serializable {
+    public static final String FORCE_STREAMING = "FORCE_STREAMING"; //$NON-NLS-1$
     public static final int STREAMING_BATCH_SIZE_IN_BYTES = 102400; // 100K
+
+    private String referenceStreamId;
+    private String persistenceStreamId;
+    protected transient T reference;
     
-    /**
-     * Reference Stream ID in the server 
-     * @return string - this is buffer managers tuple source id.
-     */
-    String getReferenceStreamId();
+    public Streamable() {
+    	
+	}
     
-    /**
-     * Reference Stream ID in the server
-     * @param id this is buffer managers tuple source id.
-     */
-    void setReferenceStreamId(String id);    
+    public Streamable(T reference) {
+        if (reference == null) {
+            throw new IllegalArgumentException(CorePlugin.Util.getString("Streamable.isNUll")); //$NON-NLS-1$
+        }
+
+    	this.reference = reference;
+    }
     
-    /**
-     * Persitence Stream ID in the server 
-     * @return string - this is buffer managers tuple source id.
-     */
-    String getPersistenceStreamId();
+    public T getReference() {
+		return reference;
+	}
     
-    /**
-     * Persitence Stream ID in the server
-     * @param id this is buffer managers tuple source id.
-     */
-    void setPersistenceStreamId(String id);       
+    public void setReference(T reference) {
+		this.reference = reference;
+	}
+    
+    public String getReferenceStreamId() {
+        return this.referenceStreamId;
+    }
+    
+    public void setReferenceStreamId(String id) {
+        this.referenceStreamId = id;
+    }
+    
+    public String getPersistenceStreamId() {
+        return persistenceStreamId;
+    }
+
+    public void setPersistenceStreamId(String id) {
+        this.persistenceStreamId = id;
+    } 
+    
+    @Override
+    public String toString() {
+        return reference.toString();
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+    	if (this == obj) {
+    		return true;
+    	}
+    	if (!(obj instanceof Streamable<?>)) {
+    		return false;
+    	}
+    	Streamable<?> other = (Streamable<?>)obj;
+    	
+    	if (this.reference != null) {
+    		return this.reference.equals(other.reference);
+    	}
+    	
+    	return this.persistenceStreamId == other.persistenceStreamId
+		&& this.referenceStreamId == other.referenceStreamId;
+    }
+
 }

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/XMLType.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -38,125 +38,57 @@
 import javax.xml.transform.Result;
 import javax.xml.transform.Source;
 
-import com.metamatrix.core.CorePlugin;
-
 /**
  * This class represents the SQLXML object along with the Streamable interface. This is
  * class used everywhere in the MetaMatrix framework, but clients are restricted to use
  * only SQLXML interface on top of this.
  */
-public final class XMLType implements Streamable, SQLXML {
+public final class XMLType extends Streamable<SQLXML> implements SQLXML {
 
-    private transient SQLXML srcXML;
-    private String referenceStreamId;
-    private String persistenceStreamId;
+	private static final long serialVersionUID = -7922647237095135723L;
     
     public XMLType(){
         
     }
     
-    public SQLXML getSourceSQLXML() {
-    	return srcXML;
-    }
-         
     public XMLType(SQLXML xml) {      
-        if (xml == null) {
-            throw new IllegalArgumentException(CorePlugin.Util.getString("XMLValue.isNUll")); //$NON-NLS-1$
-        }
-        
-        // this will serve as the in VM reference
-        this.srcXML = xml;
+        super(xml);
     }    
-            
-    public String getReferenceStreamId() {
-        return this.referenceStreamId;
-    }
-    
-    public void setReferenceStreamId(String id) {
-        this.referenceStreamId = id;
-    }
-    
-    public String getPersistenceStreamId() {
-        return persistenceStreamId;
-    }
-
-    public void setPersistenceStreamId(String id) {
-        this.persistenceStreamId = id;
-    }      
-        
+                    
     public InputStream getBinaryStream() throws SQLException {
-        checkReference();
-        return this.srcXML.getBinaryStream();
+        return this.reference.getBinaryStream();
     }
 
     public Reader getCharacterStream() throws SQLException {
-        checkReference();
-        return this.srcXML.getCharacterStream();
+        return this.reference.getCharacterStream();
     }
 
     public <T extends Source> T getSource(Class<T> sourceClass) throws SQLException {
-        checkReference();
-        return this.srcXML.getSource(sourceClass);
+        return this.reference.getSource(sourceClass);
     }
 
     public String getString() throws SQLException {
-        checkReference();
-        return this.srcXML.getString();
+        return this.reference.getString();
     }
 
     public OutputStream setBinaryStream() throws SQLException {
-        checkReference();
-        return this.srcXML.setBinaryStream();
+        return this.reference.setBinaryStream();
     }
 
     public Writer setCharacterStream() throws SQLException {
-        checkReference();
-        return this.srcXML.setCharacterStream();
+        return this.reference.setCharacterStream();
     }
 
     public void setString(String value) throws SQLException {
-        checkReference();
-        this.srcXML.setString(value);
+        this.reference.setString(value);
     }
 
-    public boolean equals(Object o) {
-    	if (this == o) {
-    		return true;
-    	}
-    	
-    	if (!(o instanceof XMLType)) {
-    		return false;
-    	}
-    	
-    	XMLType other = (XMLType)o;
-    	
-    	if (this.srcXML != null) {
-    		return this.srcXML.equals(other.srcXML);
-    	}
-    	
-    	return this.persistenceStreamId == other.persistenceStreamId
-				&& this.referenceStreamId == other.referenceStreamId;
-    }
-
-    public String toString() {
-        checkReference();
-        return srcXML.toString();
-    }
-        
-    private void checkReference() {
-        if (this.srcXML == null) {
-            throw new InvalidReferenceException(CorePlugin.Util.getString("XMLValue.InvalidReference")); //$NON-NLS-1$
-        }
-    }
-
 	public void free() throws SQLException {
-		checkReference();
-		this.srcXML.free();
+		this.reference.free();
 	}
 
 	public <T extends Result> T setResult(Class<T> resultClass)
 			throws SQLException {
-		checkReference();
-		return this.srcXML.setResult(resultClass);
+		return this.reference.setResult(resultClass);
 	}      
 }

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/basic/ClobToStringTransform.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -28,7 +28,6 @@
 
 import com.metamatrix.common.types.ClobType;
 import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
 import com.metamatrix.common.types.TransformationException;
 import com.metamatrix.core.CorePlugin;
 
@@ -65,9 +64,7 @@
             throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$            
         } catch(IOException e) {
             throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
-        } catch(InvalidReferenceException e) {
-            throw new TransformationException(e, CorePlugin.Util.getString("remote_lob_access")); //$NON-NLS-1$
-        }
+        } 
     }
 
     /**

Modified: trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java
===================================================================
--- trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/java/com/metamatrix/common/types/basic/SQLXMLToStringTransform.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -26,7 +26,6 @@
 import java.sql.SQLException;
 
 import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
 import com.metamatrix.common.types.TransformationException;
 import com.metamatrix.common.types.XMLType;
 import com.metamatrix.core.CorePlugin;
@@ -54,8 +53,6 @@
             return new String(result, 0, read);
         } catch (SQLException e) {
             throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$            
-        } catch(InvalidReferenceException e) {
-            throw new TransformationException(e, CorePlugin.Util.getString("remote_lob_access")); //$NON-NLS-1$
         } catch (IOException e) {
             throw new TransformationException(e, CorePlugin.Util.getString("failed_convert", new Object[] {getSourceType().getName(), getTargetType().getName()})); //$NON-NLS-1$
         }

Modified: trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties
===================================================================
--- trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/main/resources/com/metamatrix/core/i18n.properties	2009-08-12 17:39:14 UTC (rev 1235)
@@ -325,12 +325,8 @@
 BlobImpl.Invalid_start_position=The position to begin searching, "{0}", is not valid.
 
 
-BlobValue.isNUll=Blob object argument can not be null
-BlobValue.InvalidReference=Blob Contents are not available, use the Streaming interface to get the contents.
-ClobValue.isNUll=Clob object argument can not be null 
-ClobValue.InvalidReference=Clob Contents are not available, use the Streaming interface to get the contents.
-XMLValue.InvalidReference=XML Contents are not available, use the Streaming interface to get the contents.
-XMLValue.isNUll=SQLXML object argument can not be null
+Streamable.isNUll=Streamable object argument can not be null
+Streamable.InvalidReference=Streamable contents are not available, use the Streaming interface to get the contents.
 
 WorkerPool.New_thread=Created worker thread "{0}".
 WorkerPool.uncaughtException=Uncaught exception processing work

Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java
===================================================================
--- trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/test/java/com/metamatrix/common/types/TestBlobValue.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -68,12 +68,7 @@
         assertEquals(key, read.getReferenceStreamId());
         
         // and lost the original object
-        try {
-            read.getBinaryStream();
-            fail("this must thrown a reference stream exception"); //$NON-NLS-1$
-        } catch (InvalidReferenceException e) {
-            // pass
-        }
+        assertNull(read.getReference());
         
         saved.delete();
     }

Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java
===================================================================
--- trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/test/java/com/metamatrix/common/types/TestClobValue.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -68,12 +68,7 @@
         assertEquals(key, read.getReferenceStreamId());
         
         // and lost the original object
-        try {
-            read.getCharacterStream();
-            fail("this must thrown a reference stream exception"); //$NON-NLS-1$
-        } catch (InvalidReferenceException e) {
-            // pass
-        }
+        assertNull(read.getReference());
         
         saved.delete();
     }

Modified: trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java
===================================================================
--- trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/common-core/src/test/java/com/metamatrix/common/types/TestXMLValue.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -69,12 +69,7 @@
         assertEquals(pkey, read.getPersistenceStreamId());
         
         // and lost the original object
-        try {
-            read.getCharacterStream();
-            fail("this must thrown a reference stream exception"); //$NON-NLS-1$
-        } catch (InvalidReferenceException e) {
-            // pass
-        }
+        assertNull(read.getReference());
         
         saved.delete();
     }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -27,6 +27,7 @@
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.lob.LobChunk;
+import com.metamatrix.common.types.Streamable;
 
 /**
  * The buffer manager controls how memory is used and how data flows through 
@@ -268,4 +269,23 @@
      * to ensure that the memory can be freed. 
      */
     void releasePinnedBatches() throws MetaMatrixComponentException;
+    
+    /**
+     * Return the LOB associated with the referenceId
+     * @param id
+     * @param referenceId
+     * @return
+     * @throws TupleSourceNotFoundException
+     * @throws MetaMatrixComponentException
+     */
+    public Streamable<?> getStreamable(TupleSourceID id, String referenceId) 
+    throws TupleSourceNotFoundException, MetaMatrixComponentException;
+    
+    /**
+     * Assign the tuplesource as the persistent stream for the streamable
+     * @param id
+     * @param s
+     * @throws TupleSourceNotFoundException 
+     */
+    public void setPersistentTupleSource(TupleSourceID id, Streamable<? extends Object> s) throws TupleSourceNotFoundException;
 }

Copied: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java (from rev 1231, trunk/engine/src/main/java/com/metamatrix/common/lob/BufferManagerLobChunkStream.java)
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java	                        (rev 0)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerLobChunkStream.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.buffer;
+
+
+import java.io.IOException;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.lob.LobChunk;
+import com.metamatrix.common.lob.LobChunkProducer;
+import com.metamatrix.common.log.LogManager;
+import com.metamatrix.dqp.DQPPlugin;
+import com.metamatrix.dqp.util.LogConstants;
+
+public class BufferManagerLobChunkStream  implements LobChunkProducer {
+    private TupleSourceID sourceId;
+    private BufferManager bufferMgr;
+    private int position = 0;
+    
+    public BufferManagerLobChunkStream(String persitentId, BufferManager bufferMgr) {
+        this.sourceId = new TupleSourceID(persitentId);
+        this.bufferMgr = bufferMgr;
+    }
+    
+    public LobChunk getNextChunk() throws IOException {
+        try {
+            this.position++;
+            return bufferMgr.getStreamablePart(sourceId, position);
+        } catch (TupleSourceNotFoundException e) {
+            String msg = DQPPlugin.Util.getString("BufferManagerLobChunkStream.no_tuple_source", new Object[] {sourceId}); //$NON-NLS-1$
+            LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, msg); 
+            throw new IOException(msg);
+        } catch (MetaMatrixComponentException e) {
+            String msg = DQPPlugin.Util.getString("BufferManagerLobChunkStream.error_processing", new Object[] {sourceId}); //$NON-NLS-1$
+            LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, msg); 
+            throw new IOException(msg);
+        }                
+    }
+
+    /** 
+     * @see com.metamatrix.common.lob.LobChunkProducer#close()
+     */
+    public void close() throws IOException {
+        // we could remove the buffer tuple here but, this is just a stream, so we need to delete 
+        // that when we close th eplan.
+    }
+}
\ No newline at end of file

Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/LobTupleBatch.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer;
-
-import java.util.List;
-
-
-
-/** 
- * A marker class file holding the lob based data in a separate batch holder.
- */
-public class LobTupleBatch extends TupleBatch {
-    
-    public LobTupleBatch() {
-    }
-    
-    public LobTupleBatch(int beginRow, List listOfTupleLists) {
-        super(beginRow, listOfTupleLists);
-    }    
-}

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -25,7 +25,9 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -39,7 +41,6 @@
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.IndexedTupleSource;
-import com.metamatrix.common.buffer.LobTupleBatch;
 import com.metamatrix.common.buffer.MemoryNotAvailableException;
 import com.metamatrix.common.buffer.StorageManager;
 import com.metamatrix.common.buffer.TupleBatch;
@@ -50,23 +51,28 @@
 import com.metamatrix.common.types.Streamable;
 import com.metamatrix.core.log.MessageLevel;
 import com.metamatrix.core.util.Assertion;
+import com.metamatrix.dqp.DQPPlugin;
 import com.metamatrix.dqp.util.LogConstants;
 import com.metamatrix.query.execution.QueryExecPlugin;
 
 /**
  * <p>Default implementation of BufferManager.  This buffer manager implementation
  * assumes the usage of a StorageManager of type memory and optionally (preferred)
- * an additional StorageManager of type FILE or DISK.  If no persistent manager
- * is specified, everything managed by this BufferManager is assumed to fit in
- * memory.  This can be useful for testing or for small uses.</p>
- *
- * <p>Lots of state is cached in memory.  The tupleSourceMap contains a map of
- * TupleSourceID --> TupleSourceInfo.  Everything about a particular tuple
- * source is stored there.  The memoryState contains everything pertaining to
- * memory management.  The config contains all config info.</p>
+ * an additional StorageManager of type FILE.</p>
  */
 public class BufferManagerImpl implements BufferManager {
-    
+
+	//memory availability when reserveMemory() is called
+	static final int MEMORY_AVAILABLE = 1;
+	static final int MEMORY_EXCEED_MAX = 2; //exceed buffer manager max memory
+	static final int MEMORY_EXCEED_SESSION_MAX = 3; //exceed session max memory
+
+    private static ThreadLocal<Set<ManagedBatch>> PINNED_BY_THREAD = new ThreadLocal<Set<ManagedBatch>>() {
+    	protected Set<ManagedBatch> initialValue() {
+    		return new HashSet<ManagedBatch>();
+    	};
+    };
+
     // Initialized stuff
     private String lookup;
     private BufferConfig config;
@@ -76,15 +82,18 @@
     // groupName (String) -> TupleGroupInfo map
     private Map<String, TupleGroupInfo> groupInfos = new HashMap<String, TupleGroupInfo>();
 
-    // Storage managers
+    // Storage manager
     private StorageManager diskMgr;
 
     // ID creator
     private AtomicLong currentTuple = new AtomicLong(0);
 
-    // Memory management
-    private MemoryState memoryState;
-
+    // Keep track of how memory usage
+    private volatile long memoryUsed = 0;
+    
+    // Track the currently unpinned stuff in a sorted set
+    private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new LinkedHashSet<ManagedBatch>());
+    
     // Trigger to handle management and stats logging
     private Timer timer;
 
@@ -94,6 +103,7 @@
     private AtomicInteger pinnedFromDisk = new AtomicInteger(0);
     private AtomicInteger cleanings = new AtomicInteger(0);
     private AtomicLong totalCleaned = new AtomicLong(0);
+    private AtomicInteger pinned = new AtomicInteger();
 
     /**
      * See {@link com.metamatrix.common.buffer.BufferManagerPropertyNames} for a
@@ -108,15 +118,13 @@
         // Set up config based on properties
         this.config = new BufferConfig(properties);
 
-        // Set up memory state object
-        this.memoryState = new MemoryState(config);
-
         // Set up alarms based on config
         if(this.config.getManagementInterval() > 0) {
             TimerTask mgmtTask = new TimerTask() {
                 public void run() {
-                    clean(0);
+                    clean(0, null);
                 }
+
             };
             getTimer().schedule(mgmtTask, 0, this.config.getManagementInterval());
         }
@@ -156,7 +164,8 @@
         BufferStats stats = new BufferStats();
         
         // Get memory info
-        this.memoryState.fillStats(stats);
+        stats.memoryUsed = this.memoryUsed;
+        stats.memoryFree = config.getTotalAvailableMemory() - memoryUsed;
         
         // Get picture of what's happening
         Set<TupleSourceID> copyKeys = tupleSourceMap.keySet();
@@ -245,11 +254,8 @@
         TupleSourceID newID = new TupleSourceID(String.valueOf(this.currentTuple.getAndIncrement()), this.lookup);
         TupleGroupInfo tupleGroupInfo = getGroupInfo(groupName);
 		TupleSourceInfo info = new TupleSourceInfo(newID, schema, types, tupleGroupInfo, tupleSourceType);
-		synchronized (tupleGroupInfo) {
-            tupleGroupInfo.getTupleSourceIDs().add(newID);
-		}
+        tupleGroupInfo.getTupleSourceIDs().add(newID);
 		tupleSourceMap.put(newID, info);
-
         if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
             LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Creating TupleSource:", newID, "of type "+tupleSourceType}); //$NON-NLS-1$ //$NON-NLS-2$
         }
@@ -289,31 +295,25 @@
                     ManagedBatch batch = iter.next();
                     switch(batch.getLocation()) {
                         case ManagedBatch.UNPINNED:
-                            memoryState.removeUnpinned(batch);
-                            memoryState.releaseMemory(batch.getSize(), info.getGroupInfo());
+                            this.unpinned.remove(batch);
+                            releaseMemory(batch.getSize(), info.getGroupInfo());
                             break;
                         case ManagedBatch.PINNED:
-                            memoryState.removePinned(info.getTupleSourceID(), batch.getBeginRow());
-                            memoryState.releaseMemory(batch.getSize(), info.getGroupInfo());
+                        	PINNED_BY_THREAD.get().remove(batch);
+                        	this.pinned.getAndDecrement();
+                            releaseMemory(batch.getSize(), info.getGroupInfo());
                             break;
                     }
                 }
             }
         }
         TupleGroupInfo tupleGroupInfo = info.getGroupInfo();
-        synchronized (tupleGroupInfo) {
-			tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
-		}
+		tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
         
         // Remove disk storage
         if (this.diskMgr != null){
             this.diskMgr.removeBatches(tupleSourceID);
         }
-        
-        // remove any dependent tuple sources on this tuple source
-        // lob frame work uses the parent tuple's name as group name to
-        // tie it back to the original source.
-        removeTupleSources(tupleSourceID.getStringID());
     }
 
     /**
@@ -338,28 +338,25 @@
         	return;
         }
         List<TupleSourceID> tupleSourceIDs = null;
-        synchronized (tupleGroupInfo) {
+        synchronized (tupleGroupInfo.getTupleSourceIDs()) {
 			tupleSourceIDs = new ArrayList<TupleSourceID>(tupleGroupInfo.getTupleSourceIDs());
 		}
-        // Remove them
-        if(tupleSourceIDs.size() > 0) {
-	        MetaMatrixComponentException ex = null;
+        MetaMatrixComponentException ex = null;
 
-	        for (TupleSourceID tsID : tupleSourceIDs) {
-	     		try {
-	     			this.removeTupleSource(tsID);
-	     		} catch(TupleSourceNotFoundException e) {
-	     			// ignore and go on
-	     		} catch(MetaMatrixComponentException e) {
-	     			if(ex == null) {
-	     				ex = e;
-	     			}
-	     		}
-	        }
+        for (TupleSourceID tsID : tupleSourceIDs) {
+     		try {
+     			this.removeTupleSource(tsID);
+     		} catch(TupleSourceNotFoundException e) {
+     			// ignore and go on
+     		} catch(MetaMatrixComponentException e) {
+     			if(ex == null) {
+     				ex = e;
+     			}
+     		}
+        }
 
-            if(ex != null) {
-            	throw ex;
-            }
+        if(ex != null) {
+        	throw ex;
         }
     }
 
@@ -466,7 +463,7 @@
         // if there are lobs in source then we need to keep manage then
         // in a separate tuple sources.
         if (info.lobsInSource()) {
-            createTupleSourcesForLobs(tupleSourceID, tupleBatch);
+            correctLobReferences(info, tupleBatch);
         }        
 
         // Determine where to store
@@ -474,13 +471,13 @@
         tupleBatch.setSize(bytes);
         short location = ManagedBatch.PERSISTENT;
                 
-        if(memoryState.reserveMemory(bytes, info.getGroupInfo()) == MemoryState.MEMORY_AVAILABLE) {
+        if(reserveMemory(bytes, info.getGroupInfo()) == BufferManagerImpl.MEMORY_AVAILABLE) {
             location = ManagedBatch.UNPINNED;
         }
 
         synchronized(info) {
             if(info.isRemoved()) {
-                memoryState.releaseMemory(bytes, info.getGroupInfo());
+                releaseMemory(bytes, info.getGroupInfo());
                 throw new TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found", tupleSourceID)); //$NON-NLS-1$
             }
 
@@ -498,14 +495,14 @@
             } catch(MetaMatrixComponentException e) {
                 // If we were storing to memory, clean up memory we reserved
                 if(location != ManagedBatch.PERSISTENT) {
-                    memoryState.releaseMemory(bytes, info.getGroupInfo());
+                    releaseMemory(bytes, info.getGroupInfo());
                 }
                 throw e;
             }
             
             // Add to memory state if in memory
             if(location == ManagedBatch.UNPINNED) {
-                this.memoryState.addUnpinned(managedBatch);
+            	this.unpinned.add(managedBatch);
             }
 
             // Update info with new rows
@@ -533,18 +530,39 @@
 
         this.pinRequests.incrementAndGet();
 
-        TupleBatch memoryBatch = null;
+        TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
+        int memoryAvailability = BufferManagerImpl.MEMORY_AVAILABLE;
+
+        ManagedBatch mbatch = null;
+        synchronized (info) {
+        	mbatch = info.getBatch(beginRow);
+        }
+        if(mbatch == null) {
+            return new TupleBatch(beginRow, Collections.EMPTY_LIST);
+        } 
+
         int endRow = 0;
-        int count = 0;
         int pass = 0;
+        //if the client request previous batch, the end row 
+        //is smaller than the begin row
+        if(beginRow > maxEndRow) {
+            endRow = Math.min(beginRow, mbatch.getEndRow());
+            beginRow = Math.max(maxEndRow, mbatch.getBeginRow());
+        }else {
+            endRow = Math.min(maxEndRow, mbatch.getEndRow());
+        }
+        int count = endRow - beginRow + 1;
+        if (count == 0) {
+        	return new TupleBatch(beginRow, Collections.EMPTY_LIST);
+        }
+        long memoryRequiredByBatch = mbatch.getSize();
 
-        TupleSourceInfo info = getTupleSourceInfo(tupleSourceID, true);
-        long memoryRequiredByBatch = 0;
-        int memoryAvailability = MemoryState.MEMORY_AVAILABLE;
+        TupleBatch memoryBatch = null;
+
         while(pass < 2) {
             if(pass == 1) {
-                if(memoryAvailability == MemoryState.MEMORY_EXCEED_MAX) {
-                    clean(memoryRequiredByBatch);
+                if(memoryAvailability == BufferManagerImpl.MEMORY_EXCEED_MAX) {
+                    clean(memoryRequiredByBatch, null);
                 }else {
                     //exceed session limit
                     clean(memoryRequiredByBatch, info.getGroupInfo()); 
@@ -552,28 +570,23 @@
             }
             
             synchronized(info) {
-                ManagedBatch mbatch = info.getBatch(beginRow);
-                if(mbatch == null) {
-                    return new TupleBatch(beginRow, Collections.EMPTY_LIST);
-    
-                } else if(mbatch.getLocation() == ManagedBatch.PINNED) {
+                if(mbatch.getLocation() == ManagedBatch.PINNED) {
                     // Load batch from memory - already pinned
                     memoryBatch = mbatch.getBatch();
     
                 } else if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
                     // Already in memory - just move from unpinned to pinned
-                    mbatch.setLocation(ManagedBatch.PINNED);
-                    this.memoryState.removeUnpinned(mbatch);
-                    this.memoryState.addPinned(mbatch);
+                    this.unpinned.remove(mbatch);
+                    pin(mbatch);
     
                     // Load batch from memory
                     memoryBatch = mbatch.getBatch();
                                                 
-                } else if(mbatch.getLocation() == ManagedBatch.PERSISTENT) {
+                } else {
                     memoryRequiredByBatch = mbatch.getSize();
                     
                     // Try to reserve some memory
-                    if((memoryAvailability = memoryState.reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != MemoryState.MEMORY_AVAILABLE) {
+                    if((memoryAvailability = reserveMemory(memoryRequiredByBatch, info.getGroupInfo())) != BufferManagerImpl.MEMORY_AVAILABLE) {
                         if(pass == 0) {
                             // Break and try to clean - it is important to break out of the synchronized block
                             // here so that the clean does not cause a deadlock on this TupleSourceInfo
@@ -595,22 +608,11 @@
                     memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow, info.getTypes());
     
                     mbatch.setBatch(memoryBatch);
-                    mbatch.setLocation(ManagedBatch.PINNED);
-                    this.memoryState.addPinned(mbatch);
+                    if (info.lobsInSource()) {
+                    	correctLobReferences(info, memoryBatch);
+                    }
+                    pin(mbatch);
                 }
-                
-                //if the client request previous batch, the end row 
-                //is smaller than the begin row
-                if(beginRow > maxEndRow) {
-                    endRow = Math.min(beginRow, memoryBatch.getEndRow());
-                    beginRow = Math.max(maxEndRow, memoryBatch.getBeginRow());
-                }else {
-                    endRow = Math.min(maxEndRow, memoryBatch.getEndRow());
-                }
-                count = endRow - beginRow + 1;
-                if(count > 0) {
-                    mbatch.pin();
-                }
             }
             
             break;
@@ -618,7 +620,7 @@
 
         // Batch should now be pinned in memory, so grab it and build a correctly
         // sized batch to return
-        if(memoryBatch.getRowCount() == 0 || count == 0 || (beginRow == memoryBatch.getBeginRow() && count == memoryBatch.getRowCount())) {
+        if(beginRow == memoryBatch.getBeginRow() && count == memoryBatch.getRowCount()) {
             return memoryBatch;
         }
 
@@ -628,6 +630,15 @@
         System.arraycopy(memoryRows, firstOffset, rows, 0, count);
         return new TupleBatch(beginRow, rows);
     }
+    
+	private void pin(ManagedBatch mbatch) {
+		mbatch.setLocation(ManagedBatch.PINNED);
+		PINNED_BY_THREAD.get().add(mbatch);
+		if (!mbatch.hasPinnedRows()) {
+			pinned.getAndIncrement();
+		}
+		mbatch.pin();
+	}
 
     /**
      * Unpin a tuple source batch.
@@ -655,8 +666,9 @@
             // Determine whether batch itself should be unpinned
             if(! mbatch.hasPinnedRows()) {
                 mbatch.setLocation(ManagedBatch.UNPINNED);
-                memoryState.removePinned(tupleSourceID, mbatch.getBeginRow());
-                memoryState.addUnpinned(mbatch);
+                PINNED_BY_THREAD.get().remove(mbatch);
+                this.unpinned.add(mbatch);
+                pinned.getAndDecrement();
             }
         }
     }
@@ -698,83 +710,71 @@
         }
         return info;
     }
-
+    
     /**
-     * Clean the memory state, using LRU.  This can be done either via the background
-     * cleaning thread or actively if someone wants memory and none is free.
+     * This can be done actively if someone wants memory and none is free.
      */
-    protected void clean(long memoryRequired) {
-        // Defect 14573 - this method needs to know how much memory is required, so that (even if we're not past the active memory
-        // threshold) if the memory available is less than the memory required, we should clean up unpinned batches.
+    protected void clean(long memoryRequired, TupleGroupInfo targetGroupInfo) {
+    	cleanLobTupleSource();
+    	
+    	long released = 0;
+
         long targetLevel = config.getActiveMemoryLevel();
         long totalMemory = config.getTotalAvailableMemory();
-        long released = 0;
-
-        Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
-        while(unpinnedIter.hasNext() && // If there are unpinned batches in memory, AND
-              // Defect 14573 - if we require more than what's available, then cleanup regardless of the threshold
-              (memoryRequired > totalMemory - memoryState.getMemoryUsed() || // if the memory needed is more than what's available, or
-               memoryState.getMemoryUsed() > targetLevel)){ // if we've crossed the active memory threshold, then cleanup
-            
-            ManagedBatch batch = unpinnedIter.next();
+        
+        boolean generalCleaningDone = false;
+        List<ManagedBatch> toClean = null;
+        synchronized (unpinned) {
+        	//TODO: re-implement without having to scan and compete
+			toClean = new ArrayList<ManagedBatch>(unpinned);
+		}
+        for (ManagedBatch batch : toClean) {
             TupleSourceID tsID = batch.getTupleSourceID();
-
-            released += releaseMemory(batch, tsID);
-        }
-
-        if(released > 0) {
-            this.cleanings.incrementAndGet();
-            this.totalCleaned.addAndGet(released);
-        }
-    
-    }
-    
-    /**
-     * Over memory limit for this session. Clean the memory for this session.
-     * Clean the memory state, using LRU.  This can be done actively if someone wants memory and none is free.
-     */
-    protected void clean(long memoryRequired, TupleGroupInfo targetGroupInfo) throws TupleSourceNotFoundException{
-        boolean cleanForSessionSucceeded = false;
-        long released = 0;
-
-        Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
-        while(unpinnedIter.hasNext()) {
-            ManagedBatch batch = unpinnedIter.next();
-            TupleSourceID tsID = batch.getTupleSourceID();
-            TupleSourceInfo tsInfo = getTupleSourceInfo(tsID, false);
+            TupleSourceInfo tsInfo = this.tupleSourceMap.get(tsID);
             if(tsInfo == null) {
                 //may be removed by another thread
                 continue;
             }
-            if(!tsInfo.getGroupInfo().equals(targetGroupInfo)) {
-                //continue if they are not the same tuple group
-                continue;
+
+            long currentMemoryUsed = memoryUsed;
+            if (!generalCleaningDone && (memoryRequired <= totalMemory - currentMemoryUsed && // if the memory needed is more than what's available, or
+                currentMemoryUsed <= targetLevel)) { // if we've crossed the active memory threshold, then cleanup
+            	generalCleaningDone = true;
             }
+            if (generalCleaningDone) {
+            	if (targetGroupInfo == null) {
+            		break;
+            	}
+                if (targetGroupInfo == tsInfo.getGroupInfo()) {
+    	            //if the memory needed is more than what is available for the session, then cleanup. Otherwise, break the loop.
+    	            if(memoryRequired <= config.getMaxAvailableSession() - targetGroupInfo.getGroupMemoryUsed()) {
+    	            	break;
+    	            }                
+                } 
+            }
             
-            long groupMemoryUsed = memoryState.getGroupMemoryUsed(targetGroupInfo);
-            //if the memory needed is more than what is available for the session, then cleanup. Otherwise, break the loop.
-            if(memoryRequired <= config.getMaxAvailableSession() - groupMemoryUsed) {
-                cleanForSessionSucceeded = true;
-                break;
-            }                
-            
-            released += releaseMemory(batch, tsID);
+            released += releaseMemory(batch, tsInfo);
         }
 
         if(released > 0) {
             this.cleanings.incrementAndGet();
             this.totalCleaned.addAndGet(released);
         }
-        
-        if(!cleanForSessionSucceeded) {
-            //if we cannot clean enough memory for this session, it fails
-            return;
+    }
+
+    //TODO: run asynch
+	private void cleanLobTupleSource() {
+		String tupleSourceId = TupleSourceInfo.getStaleLobTupleSource();
+		if (tupleSourceId != null) {
+        	try {
+				removeTupleSource(new TupleSourceID(tupleSourceId));
+			} catch (TupleSourceNotFoundException e) {
+			} catch (MetaMatrixComponentException e) {
+				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, e, "Exception removing stale lob tuple source"); //$NON-NLS-1$
+			}
         }
+	}
         
-        //make sure it is not over the buffer manager memory limit
-        clean(memoryRequired);
-    }
-    
     /**
      * Release the memory for the given unpinned batch.
      * @param batch Batch to be released from memory
@@ -782,48 +782,32 @@
      * @return The size of memory released in bytes
      * @since 4.3
      */
-    private long releaseMemory(ManagedBatch batch, TupleSourceID tsID) {
+    private long releaseMemory(ManagedBatch batch, TupleSourceInfo info) {
         // Find info and lock on it
-        try {
-            TupleSourceInfo info = getTupleSourceInfo(tsID, false);
-            if(info == null) {
+        synchronized(info) {
+            if(info.isRemoved() || batch.getLocation() != ManagedBatch.UNPINNED) {
                 return 0;
             }
 
-            synchronized(info) {
-                if(info.isRemoved()) {
-                    return 0;
-                }
+            // This batch is still unpinned - move to persistent storage
+            TupleBatch dataBatch = batch.getBatch();
 
-                // Re-get the batch and check that it still exists and is unpinned
-                batch = info.getBatch(batch.getBeginRow());
-                if(batch == null || batch.getLocation() != ManagedBatch.UNPINNED) {
-                    return 0;
-                }
+            try {
+                diskMgr.addBatch(info.getTupleSourceID(), dataBatch, info.getTypes());
+            } catch(MetaMatrixComponentException e) {
+                // Can't move
+                return 0;
+            }
 
-                // This batch is still unpinned - move to persistent storage
-                TupleBatch dataBatch = batch.getBatch();
+            batch.setBatch(null);
 
-                try {
-                    diskMgr.addBatch(tsID, dataBatch, info.getTypes());
-                } catch(MetaMatrixComponentException e) {
-                    // Can't move
-                    return 0;
-                }
+            // Update memory
+            batch.setLocation(ManagedBatch.PERSISTENT);
+            this.unpinned.remove(batch);
+            releaseMemory(batch.getSize(), info.getGroupInfo());
 
-                batch.setBatch(null);
-
-                // Update memory
-                batch.setLocation(ManagedBatch.PERSISTENT);
-                memoryState.removeUnpinned(batch);
-                memoryState.releaseMemory(batch.getSize(), info.getGroupInfo());
-
-                return batch.getSize();
-            }
-        } catch(TupleSourceNotFoundException e) {
-            // ignore, go to next batch
-            return 0;
-        }    
+            return batch.getSize();
+        }
     }
     
     /**
@@ -855,114 +839,49 @@
     }
     
     /**
-     * If a tuple batch is being added with Lobs, then maintain the LOB
-     * objects in a separate TupleSource than from the original, so that
-     * the original can only serilize the id, but the otherone can serialize 
-     * the contents. 
+     * If a tuple batch is being added with Lobs, then references to
+     * the lobs will be held on the {@link TupleSourceInfo} 
      * @param batch
      */
-    private void createTupleSourcesForLobs(TupleSourceID parentId, TupleBatch batch) 
-        throws MetaMatrixComponentException, TupleSourceNotFoundException {
-
-        TupleSourceInfo info = getTupleSourceInfo(parentId, false);
+    @SuppressWarnings("unchecked")
+	private void correctLobReferences(TupleSourceInfo info, TupleBatch batch) {
         List parentSchema = info.getTupleSchema();        
         List[] rows = batch.getAllTuples();
-        
+        int columns = parentSchema.size();
         // walk through the results and find all the lobs
         for (int row = 0; row < rows.length; row++) {
-            
-            int col = 0;
-            for (Iterator i = rows[row].iterator(); i.hasNext();) {                                                
-                Object anObj = i.next();
+            for (int col = 0; col < columns; col++) {                                                
+                Object anObj = rows[row].get(col);
                 
-                if (anObj instanceof Streamable) {                                
-                    // once lob is found check to see if this has already been assigned
-                    // a streming id or not; if one is not assigned create one and assign it
-                    // to the lob; if one is already assigned just return; 
-                    // this will prohibit calling lob on itself into this routine.
-                    Streamable lob = (Streamable)anObj;                  
-                    
-                    if (lob.getReferenceStreamId() == null || lobIsNotKnownInTupleSourceMap( lob, parentId) ) {                        
-                        List schema = new ArrayList();
-                        schema.add(parentSchema.get(col));
-                        
-                        TupleSourceID id = createTupleSource(schema, new String[] {info.getTypes()[col]}, parentId.getStringID(), TupleSourceType.PROCESSOR);
-                        lob.setReferenceStreamId(id.getStringID());
-                        
-                        List results = new ArrayList();
-                        results.add(lob);
-                        
-                        List listOfRows = new ArrayList();
-                        listOfRows.add(results);
-                        
-                        // these batches are wrapped in a special marker batch tag
-                        // which are saved from forcing them to disk.
-                        LobTupleBatch separateBatch = new LobTupleBatch(1, listOfRows);
-                        separateBatch.setTerminationFlag(true);
-                        
-                        // now save this as separate tuple source.
-                        addTupleBatch(id, separateBatch);
-                    } else {                        
-                        // this means the XML object being moved from one tuple to another tuple
-                        // i.e. one plan to another plan. So update the group info.
-
-                        // First update the reference tuple source.
-                        if (!lob.getReferenceStreamId().equals(parentId.getStringID())) {
-                        	TupleGroupInfo groupInfo = getGroupInfo(parentId.getStringID());
-                            TupleSourceID id = new TupleSourceID(lob.getReferenceStreamId());
-                            TupleSourceInfo lobInfo = getTupleSourceInfo(id, false);
-                            reassignGroup(groupInfo, lobInfo);
-                            
-                            // if the lob moving parent has a assosiated persistent
-                            // tuple source, then move that one to same parent too.
-                            if (lob.getPersistenceStreamId() != null) {
-                                id = new TupleSourceID(lob.getPersistenceStreamId());
-                                lobInfo = getTupleSourceInfo(id, false);
-                                reassignGroup(groupInfo, lobInfo);                                
-                            }                            
-                        }
-                    }
+                if (!(anObj instanceof Streamable<?>)) {
+                	continue;
                 }
-                col++;
+                Streamable lob = (Streamable)anObj;                  
+                info.addLobReference(lob);
+                if (lob.getReference() == null) {
+                	lob.setReference(info.getLobReference(lob.getReferenceStreamId()).getReference());
+                }
             }
         }
     }
-
-	private void reassignGroup(TupleGroupInfo groupInfo, TupleSourceInfo lobInfo) {
-		TupleGroupInfo tupleGroupInfo = lobInfo.getGroupInfo();
-		synchronized (tupleGroupInfo) {
-			tupleGroupInfo.getTupleSourceIDs().remove(lobInfo.getTupleSourceID());
-		}
-		lobInfo.setGroupInfo(groupInfo);
-		synchronized (groupInfo) {
-			groupInfo.getTupleSourceIDs().add(lobInfo.getTupleSourceID());
-		}
-	}
     
-    private boolean lobIsNotKnownInTupleSourceMap( Streamable lob, TupleSourceID parentId) throws TupleSourceNotFoundException {
-        /*
-         * The need for this defensive feature arises because there are multiple uses of the TupleSourceMap which
-         * are somewhat inconsistent with one another.  In the case of LOBs we use the parent/child group feature
-         * of tuplesources to associate a parent tuplesource containing metadata about the LOB with a second
-         * tuplesource that contains the LOB.  When such a group is no longer needed  (for example, see SubqueryProcessorUtility.close()),
-         * removing the child tupleSources has the unfortunate side effect of leaving the actual LOBs with references to
-         * tuplesources that no longer exist, and are therefore no longer in the tupleSourceMap.
-         * 
-         * This test ensures that such orphaned LOBs will be treated correctly (TEIID-54).
-         * 
-         */
-        if (!lob.getReferenceStreamId().equals(parentId.getStringID())) {
-            TupleSourceID id = new TupleSourceID(lob.getReferenceStreamId());
-            TupleSourceInfo lobInfo = getTupleSourceInfo(id, false);
-
-            if ( lobInfo == null ) {
-                return true; // is not known
-            }
-            return false;   // is known
-        }        
-        return false;   // don't care if known
+    @Override
+    public Streamable<?> getStreamable(TupleSourceID id, String referenceId) throws TupleSourceNotFoundException, MetaMatrixComponentException {
+    	TupleSourceInfo tsInfo = getTupleSourceInfo(id, true);
+    	Streamable<?> s = tsInfo.getLobReference(referenceId);
+    	if (s == null) {
+    		throw new MetaMatrixComponentException(DQPPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
+    	}
+    	return s;
     }
     
+    @Override
+    public void setPersistentTupleSource(TupleSourceID id, Streamable<?> s) throws TupleSourceNotFoundException {
+    	cleanLobTupleSource();
+    	TupleSourceInfo tsInfo = getTupleSourceInfo(id, true);
+    	s.setPersistenceStreamId(id.getStringID());
+    	tsInfo.setContainingLobReference(s);
+    }
 
     /**  
      * @see com.metamatrix.common.buffer.BufferManager#addStreamablePart(com.metamatrix.common.buffer.TupleSourceID, com.metamatrix.common.lob.LobChunk, int)
@@ -976,7 +895,7 @@
         
         synchronized(info) {
 
-            List data = new ArrayList();
+            List<LobChunk> data = new ArrayList<LobChunk>();
             data.add(streamChunk);
             TupleBatch batch = new TupleBatch(beginRow, new List[] {data});
             this.diskMgr.addBatch(tupleSourceID, batch, info.getTypes());                
@@ -1008,44 +927,62 @@
      * @see com.metamatrix.common.buffer.BufferManager#releasePinnedBatches()
      */
     public void releasePinnedBatches() throws MetaMatrixComponentException {
-        Map<TupleSourceID, Map<Integer, ManagedBatch>> threadPinned = memoryState.getPinnedByCurrentThread();
-        if (threadPinned == null) {
-            return;
-        }
-        for (Iterator<Map.Entry<TupleSourceID, Map<Integer, ManagedBatch>>> i = threadPinned.entrySet().iterator(); i.hasNext();) {
-            Map.Entry<TupleSourceID, Map<Integer, ManagedBatch>> entry = i.next();
-            i.remove();
-            TupleSourceID tsid = entry.getKey();
-            Map<Integer, ManagedBatch> pinnedBatches = entry.getValue();
-            try {
-                for (Iterator<ManagedBatch> j = pinnedBatches.values().iterator(); j.hasNext();) {
-                    ManagedBatch batch = j.next();
-                    
-                    //TODO: add trace logging about the batch that is being unpinned
-                    unpinTupleBatch(tsid, batch.getBeginRow(), batch.getEndRow());
-                }
-            } catch (TupleSourceNotFoundException err) {
-                continue;
-            }
-        }
+    	MetaMatrixComponentException e = null;
+    	List<ManagedBatch> pinnedByThread = new ArrayList<ManagedBatch>(PINNED_BY_THREAD.get());
+    	for (ManagedBatch managedBatch : pinnedByThread) {
+    		try {
+    			//TODO: add trace logging about the batch that is being unpinned
+    			unpinTupleBatch(managedBatch.getTupleSourceID(), managedBatch.getBeginRow(), managedBatch.getEndRow());
+    		} catch (TupleSourceNotFoundException err) {
+    		} catch (MetaMatrixComponentException err) {
+    			e = err;
+    		}
+		}
+    	if (e != null) {
+    		throw e;
+    	}
     }
     
     /**
-     * for testing purposes 
+     * Check for whether the specified amount of memory can be reserved,
+     * and if so reserve it.  This is done in the same method so that the
+     * memory is not taken away by a different thread between checking and 
+     * reserving - standard "test and set" behavior. 
+     * @param bytes Bytes requested
+     * @return One of MEMORY_AVAILABLE, MEMORY_EXCEED_MAX, or MEMORY_EXCEED_SESSION_MAX
      */
-    public int getPinnedCount() {
-        Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned = memoryState.getAllPinned();
-        
-        int count = 0;
-        
-        if (pinned == null) {
-            return count;
+    private synchronized int reserveMemory(long bytes, TupleGroupInfo groupInfo) {
+        //check session limit first
+        long sessionMax = config.getMaxAvailableSession();
+        if(sessionMax - groupInfo.getGroupMemoryUsed() < bytes) {
+            return BufferManagerImpl.MEMORY_EXCEED_SESSION_MAX;
         }
         
-        for (Iterator<Map<Integer, ManagedBatch>> i = pinned.values().iterator(); i.hasNext();) {
-            count += i.next().size();
+        //then check the total memory limit
+        long max = config.getTotalAvailableMemory();
+        if(max - memoryUsed < bytes) {
+            return BufferManagerImpl.MEMORY_EXCEED_MAX;
         }
         
-        return count;
+        groupInfo.reserveMemory(bytes);
+        memoryUsed += bytes;
+        
+        return BufferManagerImpl.MEMORY_AVAILABLE;
     }
+
+    /**
+     * Release memory 
+     * @param bytes Bytes to release
+     */
+    private synchronized void releaseMemory(long bytes, TupleGroupInfo groupInfo) {
+        groupInfo.releaseMemory(bytes);
+        memoryUsed -= bytes;
+    }
+    
+    /**
+     * for testing purposes 
+     */
+    public int getPinnedCount() {
+    	return pinned.get();
+    }
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferStats.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -54,7 +54,7 @@
     public long totalCleaned;
 
     // Pinned batch details
-    public List pinnedManagedBatches = new LinkedList();
+    public List<ManagedBatch> pinnedManagedBatches = new LinkedList<ManagedBatch>();
     
     /**
      * Constructor for BufferStats.
@@ -90,7 +90,7 @@
         LogManager.logInfo(LogConstants.CTX_BUFFER_MGR, "    avgCleaned = " + avgCleaned);         //$NON-NLS-1$
 
         if ( LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE) ) {
-            HashMap stackTraces = new HashMap();
+            HashMap<List<String>, Integer> stackTraces = new HashMap<List<String>, Integer>();
             
             if ( pinnedManagedBatches.isEmpty() ) {
                 return;
@@ -102,12 +102,10 @@
             int stackNumber = 1;
     
             // pinned batch details
-            Iterator it = pinnedManagedBatches.iterator();
-            while ( it.hasNext() ) {
-                ManagedBatch batch = (ManagedBatch)it.next();
+            for (ManagedBatch batch : pinnedManagedBatches) {
                 LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "    TupleSourceID: " + batch.getTupleSourceID() + " Begin: " + batch.getBeginRow() + " End: " + batch.getEndRow()); //$NON-NLS-1$  //$NON-NLS-2$  //$NON-NLS-3$
                 
-                Integer stackKey = (Integer)stackTraces.get(batch.getCallStack());
+                Integer stackKey = stackTraces.get(batch.getCallStack());
                 
                 boolean isFirst = false;
                 
@@ -119,7 +117,7 @@
                 
                 LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "        Pinned at: " + batch.getCallStackTimeStamp() + " by call# " + stackKey); //$NON-NLS-1$ //$NON-NLS-2$ 
                 if (isFirst) {
-                    for (Iterator j = batch.getCallStack().iterator(); j.hasNext();) {
+                    for (Iterator<String> j = batch.getCallStack().iterator(); j.hasNext();) {
                         LogManager.logTrace( LogConstants.CTX_BUFFER_MGR, "        " + j.next() );         //$NON-NLS-1$                
                     }
                 }

Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -1,260 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer.impl;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.metamatrix.common.buffer.TupleSourceID;
-import com.metamatrix.common.log.LogManager;
-import com.metamatrix.core.log.MessageLevel;
-import com.metamatrix.dqp.util.LogConstants;
-
-/**
- * <p>This class represents the memory state of the BufferManagerImpl.  The 
- * critical thing to know is what batches are in memory but not being 
- * used and what batches in memory but being used.  The access patterns 
- * for these two types of information are very different, so they are stored
- * in very different data structures.</p>
- * 
- * <p>The unpinned batches are stored in a linked list, ordered by a least
- * recently used timestamp.  This is optimized for cleanup.  It's not very good 
- * for finding stuff but fortunately, we don't need to most of the time.</p>
- * 
- * <p>The pinned batches are stored in a Map, keyed by TupleSourceID.  The value 
- * is another map, keyed by beginRow, with a ManagedBatch as the value.  This 
- * is really good for finding batches quickly, which is exactly what we need 
- * to do with pinned batches.</p>
- * 
- * <p>All methods on this class are synchronized to preserve state.</p>
- */
-class MemoryState {
-    
-    private static ThreadLocal<Map<TupleSourceID, Map<Integer, ManagedBatch>>> PINNED_BY_THREAD = new ThreadLocal<Map<TupleSourceID, Map<Integer, ManagedBatch>>>();
-    
-    //memory availability when reserveMemory() is called
-    static final int MEMORY_AVAILABLE = 1;
-    static final int MEMORY_EXCEED_MAX = 2; //exceed buffer manager max memory
-    static final int MEMORY_EXCEED_SESSION_MAX = 3; //exceed session max memory
-
-    // Configuration, used to get available memory info
-    private BufferConfig config;
-
-    // Keep track of how memory we are using
-    private volatile long memoryUsed = 0;
-
-    // Track the currently pinned stuff by TupleSourceID for easy lookup
-    private Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned = new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();     
-    
-    // Track the currently unpinned stuff in a sorted list, sorted by access time
-    private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new LinkedHashSet<ManagedBatch>());
-    
-    /**
-     * Constructor for MemoryState, based on config.
-     * @param config Configuration
-     */
-    public MemoryState(BufferConfig config) {
-        this.config = config;
-    }
-
-    /**
-     * Fill the stats object with stats about memory
-     * @param stats Stats info to be filled
-     */
-    public void fillStats(BufferStats stats) {
-        stats.memoryUsed = this.memoryUsed;
-        stats.memoryFree = config.getTotalAvailableMemory() - memoryUsed;
-    }
-
-    /**
-     * Get the amount of memory currently being used in bytes
-     * @return Used memory, in bytes
-     */
-    public long getMemoryUsed() {
-        return this.memoryUsed;    
-    }
-
-    /**
-     * Check for whether the specified amount of memory can be reserved,
-     * and if so reserve it.  This is done in the same method so that the
-     * memory is not taken away by a different thread between checking and 
-     * reserving - standard "test and set" behavior. 
-     * @param bytes Bytes requested
-     * @return One of MEMORY_AVAILABLE, MEMORY_EXCEED_MAX, or MEMORY_EXCEED_SESSION_MAX
-     */
-    public synchronized int reserveMemory(long bytes, TupleGroupInfo groupInfo) {
-        //check session limit first
-        long sessionMax = config.getMaxAvailableSession();
-        if(sessionMax - groupInfo.getGroupMemoryUsed() < bytes) {
-            return MEMORY_EXCEED_SESSION_MAX;
-        }
-        
-        //then check the total memory limit
-        long max = config.getTotalAvailableMemory();
-        if(max - memoryUsed < bytes) {
-            return MEMORY_EXCEED_MAX;
-        }
-        
-        /* NOTE1
-         * Since the groupInfo call is being made in the synchronized block for the entire buffer,
-         * groupInfo doesn't need additional locking.
-         */
-        groupInfo.reserveMemory(bytes);
-        memoryUsed += bytes;
-        
-        return MEMORY_AVAILABLE;
-    }
-
-    /**
-     * Release memory 
-     * @param bytes Bytes to release
-     */
-    public synchronized void releaseMemory(long bytes, TupleGroupInfo groupInfo) {
-        // see NOTE1
-        groupInfo.releaseMemory(bytes);
-        memoryUsed -= bytes;
-    }
-    
-    /**
-     * Get the amount of memory currently being used for the specified group in bytes 
-     * @param groupInfo TupleGroupInfo
-     * @return Used memory, in bytes
-     */
-    public synchronized long getGroupMemoryUsed(TupleGroupInfo groupInfo) {
-        // see NOTE1
-        return groupInfo.getGroupMemoryUsed();
-    }
-    
-    /**
-     * Add a pinned batch
-     * @param batch Pinned batch to add
-     */
-    public void addPinned(ManagedBatch batch) {
-        synchronized (this) {
-            addPinnedInternal(pinned, batch);
-        }
-        Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned = PINNED_BY_THREAD.get();
-        if (theadPinned == null) {
-            theadPinned = new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();
-            PINNED_BY_THREAD.set(theadPinned);
-        }
-        addPinnedInternal(theadPinned, batch);
-        if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE )) {
-            batch.captureCallStack();
-        }
-    }
-
-    private void addPinnedInternal(Map<TupleSourceID, Map<Integer, ManagedBatch>> pinnedMap, ManagedBatch batch) {
-        TupleSourceID tsID = batch.getTupleSourceID();
-        Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
-        if(tsPinned == null) {
-            tsPinned = new HashMap<Integer, ManagedBatch>();
-            pinnedMap.put(tsID, tsPinned);
-        } 
-        
-        // Add batch, indexed by beginRow
-        tsPinned.put(new Integer(batch.getBeginRow()), batch);
-    }
-    
-    /**
-     * Remove a pinned batch, if not found do nothing and return null
-     * @param tsID Tuple source id
-     * @param beginRow Beginning row
-     * @return Removed batch or null if not found
-     */
-    public ManagedBatch removePinned(TupleSourceID tsID, int beginRow) {
-        ManagedBatch result = null;
-        synchronized (this) {
-            result = removePinnedInternal(pinned, tsID, beginRow);
-        }
-        if (result != null) {
-        	Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned = PINNED_BY_THREAD.get();
-            if (theadPinned != null) {
-                removePinnedInternal(theadPinned, tsID, beginRow);
-            }
-        }
-        return result;
-    }
-
-    private ManagedBatch removePinnedInternal(Map<TupleSourceID, Map<Integer, ManagedBatch>> pinnedMap, TupleSourceID tsID,
-                                              int beginRow) {
-        Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
-        if(tsPinned != null) { 
-            ManagedBatch mbatch = tsPinned.remove(new Integer(beginRow));
-            
-            if(tsPinned.size() == 0) { 
-                pinnedMap.remove(tsID);
-            }
-            
-            return mbatch; 
-        }
-        return null;
-    }
-
-    /**
-     * Add an unpinned batch
-     * @param batch Unpinned batch to add
-     */        
-    public void addUnpinned(ManagedBatch batch) {
-        unpinned.add(batch);
-    }
-    
-    /**
-     * Remove an unpinned batch
-     * @param batch Batch to remove
-     */
-    public void removeUnpinned(ManagedBatch batch) {
-        unpinned.remove(batch);
-    }
-
-    /**
-     * Get an iterator on all unpinned batches, typically for clean up 
-     * purposes.  This iterator is "safe" in that it is based on a copy
-     * of the real list and will not be invalidated by changes to the original
-     * list.  However, this means that it also may contain batches that 
-     * are no longer in the unpinned list, so the user of this iterator 
-     * should check that each batch is still unpinned.
-     * @return Safe (but possibly out of date) iterator on unpinned batches
-     */
-    public Iterator<ManagedBatch> getAllUnpinned() {
-    	synchronized (unpinned) {
-            List<ManagedBatch> copy = new ArrayList<ManagedBatch>(unpinned);
-            return copy.iterator();    
-		}
-    }
-    
-    public synchronized Map<TupleSourceID, Map<Integer, ManagedBatch>> getAllPinned() {
-        return new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>(pinned);    
-    }
-        
-    public Map<TupleSourceID, Map<Integer, ManagedBatch>> getPinnedByCurrentThread() {
-        return PINNED_BY_THREAD.get();
-    }
-
-}

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -22,6 +22,7 @@
 
 package com.metamatrix.common.buffer.impl;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -38,8 +39,8 @@
     
     private String groupName;
     /** The bytes of memory used by this tuple group*/
-    private long memoryUsed;
-    private Set<TupleSourceID> tupleSourceIDs = new HashSet<TupleSourceID>();
+    private volatile long memoryUsed;
+    private Set<TupleSourceID> tupleSourceIDs = Collections.synchronizedSet(new HashSet<TupleSourceID>());
     
     TupleGroupInfo(String groupName) {
         this.groupName = groupName;
@@ -54,17 +55,14 @@
     }
     
     long getGroupMemoryUsed() {
-        // no locking required. See MemoryState.NOTE1
         return memoryUsed;
     }
     
     long reserveMemory(long bytes) {
-        // no locking required. See MemoryState.NOTE1
         return memoryUsed += bytes;
     }
     
     long releaseMemory(long bytes) {
-        // no locking required. See MemoryState.NOTE1
         return memoryUsed -= bytes;
     }
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -22,30 +22,55 @@
 
 package com.metamatrix.common.buffer.impl;
 
-import java.util.*;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.metamatrix.common.buffer.TupleSourceID;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
 import com.metamatrix.common.types.DataTypeManager;
+import com.metamatrix.common.types.Streamable;
 
 /**
  * Describe a TupleSource and all important information about it.
  */
 public class TupleSourceInfo {
     
+	private static final AtomicLong LOB_ID = new AtomicLong();
+	private static final ReferenceQueue<Streamable<?>> LOB_QUEUE = new ReferenceQueue<Streamable<?>>();
+	
+	private static class LobReference extends PhantomReference<Streamable<?>> {
+		
+		String persistentStreamId;
+		
+		public LobReference(Streamable<?> lob) {
+			super(lob, LOB_QUEUE);
+			this.persistentStreamId = lob.getPersistenceStreamId();
+		}		
+	}
+	
     private TupleSourceType type;       // Type of TupleSource, as defined in BufferManager constants
     private TupleSourceID tsID;
     private List schema;
     private String[] types;
     private int rowCount;
-    private TupleSourceStatus status;
+    private TupleSourceStatus status = TupleSourceStatus.ACTIVE;
     private TupleGroupInfo groupInfo;  
     private boolean removed = false;
     private TreeMap<Integer, ManagedBatch> batches = new TreeMap<Integer, ManagedBatch>();
-
+    private Map<String, Streamable<?>> lobReferences; //references to contained lobs
     private boolean lobs;
     
+    @SuppressWarnings("unused")
+	private LobReference containingLobReference; //reference to containing lob
+    
     /**
      * Construct a TupleSourceInfo given information about it.
      * @param tsID Identifier 
@@ -58,16 +83,51 @@
         this.schema = schema;
         this.types = types;
         this.groupInfo = groupInfo;
-        this.status = TupleSourceStatus.ACTIVE;
-        this.rowCount = 0;   
         this.type = type;
         this.lobs = checkForLobs();
     }
     
+    public void setContainingLobReference(Streamable<?> s) {
+		this.containingLobReference = new LobReference(s);
+	}
+    
+    public void addLobReference(Streamable<Object> lob) {
+    	String id = lob.getReferenceStreamId();
+    	if (id == null) {
+    		id = String.valueOf(LOB_ID.getAndIncrement());
+    		lob.setReferenceStreamId(id);
+    	}
+    	if (this.lobReferences == null) {
+    		this.lobReferences = Collections.synchronizedMap(new HashMap<String, Streamable<?>>());
+    	}
+    	this.lobReferences.put(id, lob);
+    }
+    
+    public static String getStaleLobTupleSource() {
+    	LobReference ref = (LobReference)LOB_QUEUE.poll();
+    	if (ref == null) {
+    		return null;
+    	}
+    	return ref.persistentStreamId;
+    }
+    
+    public Streamable<?> getLobReference(String id) {
+    	if (this.lobReferences == null) {
+    		return null;
+    	}
+    	return this.lobReferences.get(id);
+    }
+    
     public void addBatch(ManagedBatch batch) {
         batches.put(batch.getBeginRow(), batch);
     }
     
+    /**
+     * Returns the batch containing the begin row or null
+     * if it doesn't exist
+     * @param beginRow
+     * @return
+     */
     public ManagedBatch getBatch(int beginRow) {
         Map.Entry<Integer, ManagedBatch> entry = batches.floorEntry(beginRow);
         if (entry != null && entry.getValue().getEndRow() >= beginRow) {
@@ -184,19 +244,16 @@
         return "TupleSourceInfo[" + this.tsID + "]";     //$NON-NLS-1$ //$NON-NLS-2$
     }            
     
-    
     private boolean checkForLobs() {
-        boolean lob = false;
-        if (types != null) {
-            for (int i = 0; i < types.length; i++) {
-                lob |= DataTypeManager.isLOB(types[i]);
+        if (types == null) {
+            // assume the worst
+        	return true;
+        }
+        for (int i = 0; i < types.length; i++) {
+            if (DataTypeManager.isLOB(types[i]) || types[i] == DataTypeManager.DefaultDataTypes.OBJECT) {
+            	return true;
             }
         }
-        else {
-            // if incase the user did not specify the types, then make
-            // them walk the batch; pay the penalty of performence
-            return true;
-        }
-        return lob;
+        return false;
     }
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -41,7 +41,6 @@
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.buffer.BufferManagerPropertyNames;
-import com.metamatrix.common.buffer.LobTupleBatch;
 import com.metamatrix.common.buffer.StorageManager;
 import com.metamatrix.common.buffer.TupleBatch;
 import com.metamatrix.common.buffer.TupleSourceID;
@@ -172,16 +171,6 @@
     public void addBatch(TupleSourceID sourceID, TupleBatch batch, String[] types)
         throws MetaMatrixComponentException {
 
-    	/* Right now we do not support the saving of the lobs to the disk.
-         * by throwing an exception the memory is never released for lobs, which is same
-         * as keeping them in a map.  This is not going to be memory hog because, the actual
-         * lob (clob or blob) are backed by connector, xml is backed by already persisted 
-         * tuple source. Here we are only saving the referenes to the actual objects.
-         */
-        if (batch instanceof LobTupleBatch) {
-        	throw new MetaMatrixComponentException(QueryExecPlugin.Util.getString("FileStorageManager.can_not_save_lobs")); //$NON-NLS-1$
-        }
-        
         // Defect 13342 - addBatch method now creates spill files if the total bytes exceeds the max file size limit
         TupleSourceInfo tsInfo = getTupleSourceInfo(sourceID, true);
         synchronized (tsInfo) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/QueryProcessor.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -181,7 +181,10 @@
         	}
         	throw new MetaMatrixComponentException(e);
         } finally {
-            bufferMgr.releasePinnedBatches();
+        	//iff this is the root command context release any pinned (Unclosed tuplesources)
+        	if (this.context.getParent() == null) {
+        		bufferMgr.releasePinnedBatches(); 
+        	}
         }
 
 		if(done || requestClosed) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/dynamic/XMLSource.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -24,7 +24,6 @@
 
 import java.io.StringReader;
 import java.util.List;
-import java.util.Properties;
 
 import javax.xml.transform.Source;
 import javax.xml.transform.stream.StreamSource;
@@ -32,10 +31,7 @@
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.buffer.BufferManager;
 import com.metamatrix.common.buffer.TupleSource;
-import com.metamatrix.common.buffer.TupleSourceID;
-import com.metamatrix.common.types.InvalidReferenceException;
 import com.metamatrix.common.types.XMLType;
-import com.metamatrix.query.processor.xml.XMLUtil;
 
 
 /** 
@@ -59,12 +55,7 @@
                     // as processing excceptions.
                     if (value instanceof XMLType) {
                         XMLType xml = (XMLType)value;
-                        try {
-                            return xml.getSource(null);
-                        } catch (InvalidReferenceException e) {
-                            xml = XMLUtil.getFromBufferManager(bufferMgr, new TupleSourceID(xml.getPersistenceStreamId()), new Properties());
-                            return xml.getSource(null);
-                        } 
+                        return xml.getSource(null);
                     }
                     return new StreamSource(new StringReader((String)value));
                 }

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/proc/ProcedurePlan.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -561,7 +561,6 @@
         String rsKey = rsName.toUpperCase();
         CursorState state = this.cursorStates.remove(rsKey);
         if(state != null) {
-        	state.ts.closeSource();
             try {
     			this.bufferMgr.removeTupleSource(state.tsID);
     		} catch (TupleSourceNotFoundException e) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentCriteriaProcessor.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -36,7 +36,6 @@
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.query.ExpressionEvaluationException;
 import com.metamatrix.common.buffer.BlockedException;
-import com.metamatrix.common.buffer.TupleSource;
 import com.metamatrix.common.buffer.TupleSourceNotFoundException;
 import com.metamatrix.query.rewriter.QueryRewriter;
 import com.metamatrix.query.sql.lang.AbstractSetCriteria;
@@ -88,12 +87,6 @@
 	                    sortSymbols.add(dependentSetStates.get(i).valueExpression);
 	                }
 	                DependentValueSource originalVs = (DependentValueSource)dependentNode.getContext().getVariableContext().getGlobalValue(valueSource);
-	                TupleSource ts;
-					try {
-						ts = dependentNode.getBufferManager().getTupleSource(originalVs.getTupleSourceID());
-					} catch (TupleSourceNotFoundException e) {
-						throw new MetaMatrixComponentException(e);
-					}
 	                this.sortUtility = new SortUtility(originalVs.getTupleSourceID(), sortSymbols, sortDirection, true, dependentNode.getBufferManager(), dependentNode.getConnectionID());
             	}
             	dvs = new DependentValueSource(sortUtility.sort(), dependentNode.getBufferManager());

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/DependentValueSource.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -103,6 +103,7 @@
         			result.add(value);
         		}
         	}
+        	its.closeSource();
         	if (cachedSets == null) {
         		cachedSets = new HashMap<Expression, HashSet<Object>>();
         	}

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/GroupingNode.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -368,7 +368,7 @@
             currentGroupTuple = null;
             groupBegin++;
         }
-        
+        this.groupTupleSource.closeSource();
         if(rowCount != 0 || sortElements == null) {
             // Close last group
             List row = new ArrayList(functions.length);

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/PartitionedSortJoin.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -299,6 +299,7 @@
     			matchEnd = -1;
         		partitionedTuple = null;
     		}
+    		currentSource.closeSource();
     		currentSource = null;
     		currentPartition++;
     	}

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/relational/SortUtility.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -363,6 +363,7 @@
 			} finally {
 				tc.saveBatch();
 			}
+			outTs.closeSource();
         	outTs = null;
         }
 	}

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLPlan.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -71,7 +71,6 @@
 import com.metamatrix.common.lob.LobChunk;
 import com.metamatrix.common.log.LogManager;
 import com.metamatrix.common.types.DataTypeManager;
-import com.metamatrix.common.types.InvalidReferenceException;
 import com.metamatrix.common.types.SQLXMLImpl;
 import com.metamatrix.common.types.Streamable;
 import com.metamatrix.common.types.XMLType;
@@ -274,7 +273,7 @@
             // if this is the first chunk, then create a tuple source id for this sequence of chunks
             if (!this.docInProgress) {
                 this.docInProgress = true;
-                this.docInProgressTupleSourceId = XMLUtil.createXMLTupleSource(this.bufferMgr, this.resultsTupleSourceId.getStringID());
+                this.docInProgressTupleSourceId = XMLUtil.createXMLTupleSource(this.bufferMgr, this.getContext().getConnectionID());
                 this.chunkPosition = 1;
             }
             
@@ -287,8 +286,8 @@
                 
                 // we want this to be naturally feed by chunks whether inside
                 // or out side the processor
-                xml = new XMLType();
-                xml.setPersistenceStreamId(this.docInProgressTupleSourceId.getStringID());
+                xml = new XMLType(XMLUtil.getFromBufferManager(bufferMgr, this.docInProgressTupleSourceId, getProperties()));
+                this.bufferMgr.setPersistentTupleSource(this.docInProgressTupleSourceId, xml);
                 
                 //reset current document state.
                 this.docInProgress = false;
@@ -356,12 +355,7 @@
         Reader source = null;
 
         try {        
-            try {
-                source = xmlDoc.getCharacterStream();
-            } catch (InvalidReferenceException e) {
-                xmlDoc = XMLUtil.getFromBufferManager(this.bufferMgr, new TupleSourceID(xmlDoc.getPersistenceStreamId()), props);
-                source = xmlDoc.getCharacterStream();
-            }
+            source = xmlDoc.getCharacterStream();
         
             // Validate against schema
             if(this.shouldValidate) {

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xml/XMLUtil.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -32,18 +32,17 @@
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.common.buffer.BufferManager;
+import com.metamatrix.common.buffer.BufferManagerLobChunkStream;
 import com.metamatrix.common.buffer.TupleSourceID;
 import com.metamatrix.common.buffer.TupleSourceNotFoundException;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
 import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
-import com.metamatrix.common.lob.BufferManagerLobChunkStream;
 import com.metamatrix.common.lob.ByteLobChunkStream;
 import com.metamatrix.common.lob.LobChunk;
 import com.metamatrix.common.lob.LobChunkInputStream;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.common.types.SQLXMLImpl;
 import com.metamatrix.common.types.XMLReaderFactory;
-import com.metamatrix.common.types.XMLType;
 import com.metamatrix.query.sql.symbol.ElementSymbol;
 
 
@@ -95,15 +94,8 @@
      * This will reconstruct the XML object from the buffer manager from given 
      * buffer manager id. 
      */
-    public static XMLType getFromBufferManager(final BufferManager bufferMgr, final TupleSourceID sourceId, Properties props) {
-        SQLXML sqlXML = new SQLXMLImpl(new BufferMangerXMLReaderFactory(bufferMgr, sourceId), props);
-        
-        // this is object to be sent to the client. The reference
-        // id will be set by the buffer manager.
-        XMLType xml = new XMLType(sqlXML);
-        xml.setPersistenceStreamId(sourceId.getStringID());
-        
-        return xml;
+    public static SQLXML getFromBufferManager(final BufferManager bufferMgr, final TupleSourceID sourceId, Properties props) {
+        return new SQLXMLImpl(new BufferMangerXMLReaderFactory(bufferMgr, sourceId), props);
     }
     
     /**

Modified: trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/processor/xquery/XQueryPlan.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -160,21 +160,15 @@
             List rows = new ArrayList(1);
             List row = new ArrayList(1);
 
-            // this may be little confusing, but the top layer is not immediately going
-            // to disk for saving; when it does it only saves the streaming id, not the
-            // contents. The below one saves immediately to disk, and when client refers to top
-            // id, processor know about the *saved* and gets the contents from it.
+            TupleSourceID savedId = XMLUtil.saveToBufferManager(this.bufferMgr, this.getContext().getConnectionID(), srcXML, this.chunkSize);
+
+            //for large documents use the buffermanager version instead
+            if (this.bufferMgr.getFinalRowCount(savedId) > 1) {
+            	srcXML = XMLUtil.getFromBufferManager(this.bufferMgr, savedId, getFormatProperties());
+            }
             
-            // the one which saves to disk
-            TupleSourceID savedId = XMLUtil.saveToBufferManager(this.bufferMgr, this.resultsTupleSourceId.getStringID(), srcXML, this.chunkSize);
-            
-            // here we have 2 options; create xml from original source or from buffer
-            // manager. since buffer manager is slow we will choose the first option.
-            // incase this xml used in processor it will be faster; if it used in the
-            // client using steaming will be slow.
             XMLType xml = new XMLType(srcXML);
-            xml.setPersistenceStreamId(savedId.getStringID());
-            //XMLValue xml = XMLUtil.getFromBufferManager(this.bufferMgr, savedId, this.chunkSize, getFormatProperties());
+            this.bufferMgr.setPersistentTupleSource(savedId, xml);
             
             // now build the top batch with information from the saved one.
             row.add(xml);

Modified: trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/util/CommandContext.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -30,18 +30,14 @@
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.query.QueryProcessingException;
-import com.metamatrix.common.util.PropertiesUtils;
 import com.metamatrix.core.util.ArgCheck;
 import com.metamatrix.query.QueryPlugin;
 import com.metamatrix.query.eval.SecurityFunctionEvaluator;
 import com.metamatrix.query.execution.QueryExecPlugin;
 import com.metamatrix.query.optimizer.relational.PlanToProcessConverter;
 import com.metamatrix.query.processor.QueryProcessor;
-import com.metamatrix.query.sql.symbol.ContextReference;
 import com.metamatrix.query.sql.symbol.ElementSymbol;
 import com.metamatrix.query.sql.symbol.Expression;
-import com.metamatrix.query.sql.util.ValueIterator;
-import com.metamatrix.query.sql.util.ValueIteratorSource;
 import com.metamatrix.query.sql.util.VariableContext;
 
 /** 
@@ -159,6 +155,10 @@
     public CommandContext() {        
     }
     
+    public CommandContext getParent() {
+		return parent;
+	}
+    
     public boolean isSessionFunctionEvaluated() {
     	if (parent != null) {
     		return parent.isSessionFunctionEvaluated();

Modified: trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/com/metamatrix/query/xquery/saxon/SaxonXQueryExpression.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -44,10 +44,7 @@
 import com.metamatrix.api.exception.MetaMatrixComponentException;
 import com.metamatrix.api.exception.MetaMatrixProcessingException;
 import com.metamatrix.common.types.SQLXMLImpl;
-import com.metamatrix.core.util.StringUtil;
 import com.metamatrix.query.QueryPlugin;
-import com.metamatrix.query.eval.Evaluator;
-import com.metamatrix.query.sql.symbol.Expression;
 import com.metamatrix.query.util.XMLFormatConstants;
 import com.metamatrix.query.xquery.XQueryExpression;
 import com.metamatrix.query.xquery.XQuerySQLEvaluator;

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobChunkStream.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -27,17 +27,14 @@
 import java.sql.SQLException;
 
 import com.metamatrix.common.buffer.BufferManager;
-import com.metamatrix.common.lob.BufferManagerLobChunkStream;
 import com.metamatrix.common.lob.ByteLobChunkStream;
 import com.metamatrix.common.lob.LobChunk;
 import com.metamatrix.common.lob.LobChunkProducer;
 import com.metamatrix.common.lob.ReaderInputStream;
 import com.metamatrix.common.types.BlobType;
 import com.metamatrix.common.types.ClobType;
-import com.metamatrix.common.types.InvalidReferenceException;
 import com.metamatrix.common.types.Streamable;
 import com.metamatrix.common.types.XMLType;
-import com.metamatrix.dqp.DQPPlugin;
 
 /** 
  * A Lob Stream builder class. Given the Lob object this object can build 
@@ -48,7 +45,7 @@
 
     LobChunkProducer internalStream = null;
     
-    public LobChunkStream(Streamable streamable, int chunkSize, BufferManager bufferMgr) 
+    public LobChunkStream(Streamable<?> streamable, int chunkSize, BufferManager bufferMgr) 
         throws IOException {
         
         try {
@@ -64,14 +61,6 @@
                 BlobType blob = (BlobType)streamable;
                 this.internalStream = new ByteLobChunkStream(blob.getBinaryStream(), chunkSize);                        
             }
-        } catch (InvalidReferenceException e) {
-            // if the lob did not have a persistent id, there is no way for us to re-create the
-            // object. so throw an error.
-            if (streamable.getPersistenceStreamId() == null) {
-                throw new IOException(DQPPlugin.Util.getString("LobStream.noreference")); //$NON-NLS-1$
-            }            
-            // otherwise read directly from the buffer manager. 
-            this.internalStream = new BufferManagerLobChunkStream(streamable.getPersistenceStreamId(), bufferMgr);            
         } catch(SQLException e) {
             IOException ex = new IOException();
             ex.initCause(e);

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/LobWorkItem.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -23,14 +23,9 @@
 package org.teiid.dqp.internal.process;
 
 import java.io.IOException;
-import java.util.List;
 
 import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.CommonPlugin;
 import com.metamatrix.common.buffer.BlockedOnMemoryException;
-import com.metamatrix.common.buffer.MemoryNotAvailableException;
-import com.metamatrix.common.buffer.TupleBatch;
-import com.metamatrix.common.buffer.TupleSourceID;
 import com.metamatrix.common.buffer.TupleSourceNotFoundException;
 import com.metamatrix.common.comm.api.ResultsReceiver;
 import com.metamatrix.common.lob.LobChunk;
@@ -73,7 +68,7 @@
         	// If no previous stream is not found for this request create one and 
             // save for future 
             if (stream == null) {
-                stream = createLobStream(new TupleSourceID(streamId));
+                stream = createLobStream(streamId);
             }
             
             // now get the chunk from stream
@@ -119,35 +114,13 @@
      * Create a object which can create a sequence of LobChunk objects on a given
      * LOB object 
      */
-    private LobChunkStream createLobStream(TupleSourceID referenceStreamId) 
+    private LobChunkStream createLobStream(String referenceStreamId) 
         throws BlockedOnMemoryException, MetaMatrixComponentException, IOException, TupleSourceNotFoundException {
         
         // get the reference object in the buffer manager, and try to stream off
         // the original sources.
-        TupleBatch batch = null;
-        try {
-            batch = dqpCore.getBufferManager().pinTupleBatch(referenceStreamId, 1, 1);
-            List[] tuples = batch.getAllTuples();
-
-            if (tuples != null && tuples.length > 0) {
-                Object anObj = tuples[0].get(0);
-                if (anObj instanceof Streamable) {
-                    Streamable streamable = (Streamable)anObj;
-                    return new LobChunkStream(streamable, chunkSize, dqpCore.getBufferManager());                        
-                }                                    
-            } 
-            throw new MetaMatrixComponentException(DQPPlugin.Util.getString("ProcessWorker.wrongdata")); //$NON-NLS-1$
-        } catch (MemoryNotAvailableException e) {
-            throw BlockedOnMemoryException.INSTANCE;
-        } finally {
-            try {
-                if (batch != null) {
-                	dqpCore.getBufferManager().unpinTupleBatch(referenceStreamId, batch.getBeginRow(), batch.getEndRow());
-                }
-            } catch (MetaMatrixComponentException e) {
-            	LogManager.logDetail(LogConstants.CTX_DQP, e, "Call to unpin failed during lob stream creation"); //$NON-NLS-1$
-            } 
-        }        
+        Streamable<?> streamable = dqpCore.getBufferManager().getStreamable(parent.resultsID, referenceStreamId);
+        return new LobChunkStream(streamable, chunkSize, dqpCore.getBufferManager());                        
     }
     
     synchronized void setResultsReceiver(ResultsReceiver<LobChunk> resultsReceiver) {

Modified: trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -141,7 +141,7 @@
     protected Command originalCommand;
     private AnalysisRecord analysisRecord;
     private TransactionContext transactionContext;
-    private TupleSourceID resultsID;
+    protected TupleSourceID resultsID;
     private Collection schemas;     // These are schemas associated with XML results
     private boolean returnsUpdateCount;
     

Modified: trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -236,19 +236,7 @@
         assertTrue(xml1.getPersistenceStreamId() == null);
         assertTrue(xml2.getPersistenceStreamId() == null);
         
-        TupleSourceInfo info = mgr.getTupleSourceInfo(new TupleSourceID(xml1.getReferenceStreamId()), true);
-        // make sure the group name of the reference lob, is same as part batch id
-        assertEquals(id.getStringID(), info.getGroupInfo().getGroupName());
-     
-        // now delete the parent tuple source, this should delete the 
-        // all the kids with same name
-        mgr.removeTupleSource(id);
-                
-        try {
-            mgr.getTupleSource(new TupleSourceID(xml2.getReferenceStreamId()));
-            fail("this is already should have been cleaned up by above one"); //$NON-NLS-1$
-        } catch (TupleSourceNotFoundException e) {
-        }
+        assertNotNull(mgr.getStreamable(id, xml1.getReferenceStreamId()));
     }
     
     public void testAddStreamablePart() throws Exception {

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/TestProcessor.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -50,6 +50,7 @@
 import com.metamatrix.common.buffer.BufferManagerFactory;
 import com.metamatrix.common.buffer.TupleSource;
 import com.metamatrix.common.buffer.TupleSourceID;
+import com.metamatrix.common.buffer.TupleSourceNotFoundException;
 import com.metamatrix.common.buffer.impl.BufferManagerImpl;
 import com.metamatrix.common.types.DataTypeManager;
 import com.metamatrix.common.types.XMLType;
@@ -215,16 +216,18 @@
         }
     }
 
-    private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager) {
+    private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager) throws TupleSourceNotFoundException, MetaMatrixComponentException {
         helpProcessException(plan, dataManager, null);
     }
     
-    private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager, String expectedErrorMessage) {
-
+    private void helpProcessException(ProcessorPlan plan, ProcessorDataManager dataManager, String expectedErrorMessage) throws TupleSourceNotFoundException, MetaMatrixComponentException {
+    	TupleSourceID tsId = null;
+    	BufferManager bufferMgr = null;
         try {   
-            BufferManager bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
+            bufferMgr = BufferManagerFactory.getStandaloneBufferManager();
             CommandContext context = new CommandContext("0", "test", null, null, null); //$NON-NLS-1$ //$NON-NLS-2$
             QueryProcessor processor = new QueryProcessor(plan, context, bufferMgr, dataManager);
+            tsId = processor.getResultsID();
             processor.process();
             fail("Expected error during processing, but got none."); //$NON-NLS-1$
         } catch(MetaMatrixCoreException e) {
@@ -232,6 +235,8 @@
             if(expectedErrorMessage != null) {
                 assertEquals(expectedErrorMessage, e.getMessage());
             }
+        } finally {
+        	bufferMgr.removeTupleSource(tsId);
         }
     }
         
@@ -2296,7 +2301,7 @@
      * Tests a scalar subquery which returns more than one rows
      * causes the expected Exception
      */
-    @Test public void testSubqueryScalarException() {
+    @Test public void testSubqueryScalarException() throws Exception {
         String sql = "SELECT e1, (SELECT e2 FROM pm2.g1) FROM pm1.g1"; //$NON-NLS-1$
 
         // Construct data manager with data

Modified: trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/engine/src/test/java/com/metamatrix/query/processor/relational/TestSortNode.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -107,13 +107,12 @@
     }
 
     public static int getIntBatchSize() {
-        List[] expected = new List[] { 
-                Arrays.asList(new Object[] { new Integer(0) }), 
-           };     
+        List[] expected = new List[BATCH_SIZE];
+        Arrays.fill(expected, Arrays.asList(1));
         
         String[] types = { "integer" };     //$NON-NLS-1$
 
-        int size = (int)SizeUtility.getBatchSize( types, expected ) * BATCH_SIZE;
+        int size = (int)SizeUtility.getBatchSize( types, expected );
         return size;
     }
     

Modified: trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
===================================================================
--- trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java	2009-08-12 16:36:33 UTC (rev 1234)
+++ trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java	2009-08-12 17:39:14 UTC (rev 1235)
@@ -54,7 +54,6 @@
     private static final String DEFAULT_MANAGEMENT_INTERVAL = "0"; //$NON-NLS-1$
     private static final String DEFAULT_LOG_STATS_INTERVAL = DEFAULT_MANAGEMENT_INTERVAL;
     private static final String DEFAULT_SESSION_USE_PERCENTAGE = "100"; //$NON-NLS-1$
-    private static final String DEFAULT_ID_CREATOR = "com.metamatrix.common.buffer.impl.LongIDCreator"; //$NON-NLS-1$
     private static final String DEFAULT_MAX_OPEN_FILES = "10"; //$NON-NLS-1$
     
     // Instance



More information about the teiid-commits mailing list