[teiid-commits] teiid SVN: r3553 - in trunk: client/src/main/java/org/teiid/client and 5 other directories.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sun Oct 16 23:12:03 EDT 2011


Author: shawkins
Date: 2011-10-16 23:12:03 -0400 (Sun, 16 Oct 2011)
New Revision: 3553

Added:
   trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
Removed:
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
Modified:
   trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
   trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
   trunk/client/src/main/java/org/teiid/client/ResultsMessage.java
   trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java
   trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
   trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
   trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
   trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 correcting locking with an immutable key and updating jboss-beans

Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml	2011-10-17 03:12:03 UTC (rev 3553)
@@ -48,6 +48,14 @@
              However inline lob values are not supported by pre-7.6 clients, so disable this 
              property if using older clients utilizing lobs. (default true) -->
         <property name="inlineLobs">true</property>
+        <!-- Memory buffer space used by the buffer manager in MB.  -1 determines the setting automatically from the maxReserveKB (default -1).  
+    	     This value cannot be smaller than maxStorageObjectSize. -->
+        <property name="maxMemoryBufferSpace">-1</property>
+        <!-- Set to true to hold the memory buffer off-heap. If true you must ensure that the VM can allocate that much direct memory (default false). -->
+        <property name="memoryBufferOffHeap">false</property>
+        <!-- The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608 or 8MB).  
+        	 Setting this value too high will reduce the effectiveness of the memory buffer.-->
+        <property name="maxStorageObjectSize">8388608</property>
     </bean>
     
     <bean name="CacheFactory" class="org.teiid.cache.jboss.ClusterableCacheFactory">

Modified: trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/BatchSerializer.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/client/src/main/java/org/teiid/client/BatchSerializer.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -22,21 +22,28 @@
 
 package org.teiid.client;
 
+import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.ObjectStreamConstants;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TimeZone;
 
 import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.BlobType;
+import org.teiid.core.types.ClobType;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.XMLType;
 import org.teiid.jdbc.JDBCPlugin;
 
 
@@ -67,6 +74,90 @@
         serializers.put(DataTypeManager.DefaultDataTypes.TIMESTAMP,     new TimestampColumnSerializer());
     }
     
+    private static final Map<String, ColumnSerializer> version1serializers = new HashMap<String, ColumnSerializer>(128);
+    static {
+    	version1serializers.put(DataTypeManager.DefaultDataTypes.DATE,          new DateColumnSerializer1());
+    	version1serializers.put(DataTypeManager.DefaultDataTypes.TIME,          new TimeColumnSerializer1());
+    	version1serializers.put(DataTypeManager.DefaultDataTypes.STRING,     	new StringColumnSerializer1());
+    	version1serializers.put(DataTypeManager.DefaultDataTypes.CLOB,     		new ClobColumnSerializer1());
+    	version1serializers.put(DataTypeManager.DefaultDataTypes.BLOB,     		new BlobColumnSerializer1());
+    	version1serializers.put(DataTypeManager.DefaultDataTypes.XML,     		new XmlColumnSerializer1());
+    	version1serializers.put(DataTypeManager.DefaultDataTypes.NULL,     		new NullColumnSerializer1());
+    	//TODO: do better with just an object column
+    }
+    
+    private static final int MAX_UTF = 0xFFFF/3; //this is greater than the expected max length of Teiid Strings
+    
+    private static class StringColumnSerializer1 extends ColumnSerializer {
+    	@Override
+    	protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+    		String str = (String)obj;
+        	if (str.length() <= MAX_UTF) {
+        		//skip object serialization if we have a short string
+        	    out.writeByte(ObjectStreamConstants.TC_STRING);
+        	    out.writeUTF(str);
+        	} else {
+        		out.writeByte(ObjectStreamConstants.TC_LONGSTRING);
+        		out.writeObject(obj);
+        	}
+        }
+    	
+    	@Override
+    	protected Object readObject(ObjectInput in) throws IOException,
+    			ClassNotFoundException {
+    		if (in.readByte() == ObjectStreamConstants.TC_STRING) {
+    			return in.readUTF();
+    		}
+    		return super.readObject(in);
+    	}
+    }
+
+    private static class NullColumnSerializer1 extends ColumnSerializer {
+    	@Override
+    	public void writeColumn(ObjectOutput out, int col,
+    			List<? extends List<?>> batch) throws IOException {
+    	}
+    	
+    	@Override
+    	public void readColumn(ObjectInput in, int col,
+    			List<List<Object>> batch, byte[] isNull) throws IOException,
+    			ClassNotFoundException {
+    	}
+    }
+    
+    private static class ClobColumnSerializer1 extends ColumnSerializer {
+        protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+        	((Externalizable)obj).writeExternal(out);
+        }
+        protected Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+        	ClobType ct = new ClobType();
+        	ct.readExternal(in);
+            return ct;
+        }
+    }
+
+    private static class BlobColumnSerializer1 extends ColumnSerializer {
+        protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+        	((Externalizable)obj).writeExternal(out);
+        }
+        protected Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+        	BlobType bt = new BlobType();
+        	bt.readExternal(in);
+            return bt;
+        }
+    }
+
+    private static class XmlColumnSerializer1 extends ColumnSerializer {
+        protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+        	((Externalizable)obj).writeExternal(out);
+        }
+        protected Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+        	XMLType xt = new XMLType();
+        	xt.readExternal(in);
+            return xt;
+        }
+    }
+
     /**
      * Packs the (boolean) information about whether data values in the column are null
      * into bytes so that we send ~n/8 instead of n bytes.
@@ -338,6 +429,34 @@
         }
     }
     
+    static int DATE_NORMALIZER = 0;
+    	
+	static {
+		Calendar c = Calendar.getInstance();
+		c.setTimeZone(TimeZone.getTimeZone("GMT")); //$NON-NLS-1$
+		c.set(1900, 0, 1, 0, 0, 0);
+		c.set(Calendar.MILLISECOND, 0);
+		DATE_NORMALIZER = -(int)(c.getTime().getTime()/60000); //support a 32 bit range starting at this value
+	}
+
+    private static class DateColumnSerializer1 extends ColumnSerializer {
+        protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+            out.writeInt((int)(((java.sql.Date)obj).getTime()/60000) + DATE_NORMALIZER);
+        }
+        protected Object readObject(ObjectInput in) throws IOException {
+            return new java.sql.Date(((in.readInt()&0xffffffffL) - DATE_NORMALIZER)*60000);
+        }
+    }
+    
+    private static class TimeColumnSerializer1 extends ColumnSerializer {
+        protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+            out.writeInt((int)(((Time)obj).getTime()/1000));
+        }
+        protected Object readObject(ObjectInput in) throws IOException {
+            return new Time((in.readInt()&0xffffffffL)*1000);
+        }
+    }
+    
     private static class TimestampColumnSerializer extends ColumnSerializer {
         protected void writeObject(ObjectOutput out, Object obj) throws IOException {
             Timestamp ts =  (Timestamp)obj;
@@ -351,15 +470,25 @@
         }
     }
         
-    private static ColumnSerializer getSerializer(String type) {
-        ColumnSerializer cs = serializers.get((type == null) ? DataTypeManager.DefaultDataTypes.OBJECT : type);
+    private static ColumnSerializer getSerializer(String type, byte version) {
+    	ColumnSerializer cs = null;
+    	if (version == 1) {
+    		cs = version1serializers.get((type == null) ? DataTypeManager.DefaultDataTypes.OBJECT : type);
+    	}
+    	if (cs == null) {
+    		cs = serializers.get((type == null) ? DataTypeManager.DefaultDataTypes.OBJECT : type);
+    	}
         if (cs == null) {
         	return defaultSerializer;
         }
         return cs;
     }
+
+    public static void writeBatch(ObjectOutput out, String[] types, List<? extends List<?>> batch) throws IOException {
+    	writeBatch(out, types, batch, (byte)1);
+    }
     
-    public static void writeBatch(ObjectOutput out, String[] types, List<? extends List<?>> batch) throws IOException {
+    public static void writeBatch(ObjectOutput out, String[] types, List<? extends List<?>> batch, byte version) throws IOException {
         if (batch == null) {
             out.writeInt(-1);
         } else {
@@ -368,7 +497,7 @@
 	            int columns = types.length;
 	            out.writeInt(columns);
 	            for(int i = 0; i < columns; i++) {
-	            	ColumnSerializer serializer = getSerializer(types[i]);
+	            	ColumnSerializer serializer = getSerializer(types[i], version);
 	                try {
 	                    serializer.writeColumn(out, i, batch);
 	                } catch (ClassCastException e) {
@@ -389,6 +518,10 @@
     }
     
     public static List<List<Object>> readBatch(ObjectInput in, String[] types) throws IOException, ClassNotFoundException {
+    	return readBatch(in, types, (byte)1);
+    }
+    
+    public static List<List<Object>> readBatch(ObjectInput in, String[] types, byte version) throws IOException, ClassNotFoundException {
         int rows = in.readInt();
         if (rows == 0) {
             return new ArrayList<List<Object>>(0);
@@ -402,7 +535,7 @@
             }
             byte[] isNullBuffer = new byte[(extraRows > 0) ? numBytes + 1: numBytes];
             for (int col = 0; col < columns; col++) {
-                getSerializer(types[col]).readColumn(in, col, batch, isNullBuffer);
+                getSerializer(types[col], version).readColumn(in, col, batch, isNullBuffer);
             }
             return batch;
         }

Modified: trunk/client/src/main/java/org/teiid/client/ResultsMessage.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/ResultsMessage.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/client/src/main/java/org/teiid/client/ResultsMessage.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -240,7 +240,7 @@
         dataTypes = ExternalizeUtil.readStringArray(in);
 
         // Row data
-        results = BatchSerializer.readBatch(in, dataTypes);
+        results = BatchSerializer.readBatch(in, dataTypes, (byte)0);
 
         // Plan Descriptions
         planDescription = (PlanNode)in.readObject();
@@ -272,7 +272,7 @@
         ExternalizeUtil.writeArray(out, dataTypes);
 
         // Results data
-        BatchSerializer.writeBatch(out, dataTypes, results);
+        BatchSerializer.writeBatch(out, dataTypes, results, (byte)0);
 
         // Plan descriptions
         out.writeObject(this.planDescription);

Modified: trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java
===================================================================
--- trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -29,15 +29,13 @@
 import java.io.ObjectOutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.List;
 
 import junit.framework.TestCase;
 
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.TimestampWithTimezone;
 
 
 
@@ -47,22 +45,6 @@
  */
 public class TestBatchSerializer extends TestCase {
 
-    private static void assertEqual(List[] expectedBatch, List[] batch) {
-        if (expectedBatch == null) {
-            assertNull(batch);
-            return;
-        }
-        assertEquals(expectedBatch.length, batch.length);
-        if (expectedBatch.length > 0) {
-            int columns = expectedBatch[0].size();
-            for (int row = 0; row < expectedBatch.length; row++) {
-                for (int col = 0; col < columns; col++) {
-                    assertEquals(expectedBatch[row].get(col), batch[row].get(col));
-                }
-            }
-        }
-    }
-    
     private static void helpTestSerialization(String[] types, List<?>[] batch) throws IOException, ClassNotFoundException {
         ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
         ObjectOutputStream out = new ObjectOutputStream(byteStream);
@@ -108,21 +90,21 @@
         List[] batch = new List[rows];
         
         for (int i = 0; i < rows; i++) {
-            long currentTime = System.currentTimeMillis();
+            java.util.Date d = new java.util.Date();
             Object[] data = { new BigDecimal("" + i), //$NON-NLS-1$
                               new BigInteger(Integer.toString(i)),
                               (i%2 == 0) ? Boolean.FALSE: Boolean.TRUE,
                               new Byte((byte)i),
                               new Character((char)i),
-                              new Date(currentTime),
+                              TimestampWithTimezone.createDate(d),
                               new Double(i),
                               new Float(i),
                               new Integer(i),
                               new Long(i),
                               new Short((short)i),
                               sampleString(i),
-                              new Time(currentTime),
-                              new Timestamp(currentTime)
+                              TimestampWithTimezone.createTime(d),
+                              TimestampWithTimezone.createTimestamp(d)
                             };
             batch[i] = Arrays.asList(data);
         }
@@ -133,22 +115,22 @@
         List[] batch = new List[rows];
         
         for (int i = 0; i < rows; i++) {
-            long currentTime = System.currentTimeMillis();
+        	java.util.Date d = new java.util.Date();
             int mod = i%14;
             Object[] data = { (mod == 0) ? null : new BigDecimal("" + i), //$NON-NLS-1$
                               (mod == 1) ? null : new BigInteger(Integer.toString(i)),
                               (mod == 2) ? null : ((i%2 == 0) ? Boolean.FALSE: Boolean.TRUE),
                               (mod == 3) ? null : new Byte((byte)i),
                               (mod == 4) ? null : new Character((char)i),
-                              (mod == 5) ? null : new Date(currentTime),
+                              (mod == 5) ? null : TimestampWithTimezone.createDate(d),
                               (mod == 6) ? null : new Double(i),
                               (mod == 7) ? null : new Float(i),
                               (mod == 8) ? null : new Integer(i),
                               (mod == 9) ? null : new Long(i),
                               (mod == 10) ? null : new Short((short)i),
                               (mod == 11) ? null : sampleString(i),
-                              (mod == 12) ? null : new Time(currentTime),
-                              (mod == 13) ? null : new Timestamp(currentTime)
+                              (mod == 12) ? null : TimestampWithTimezone.createTime(d),
+                              (mod == 13) ? null : TimestampWithTimezone.createTimestamp(d)
                             };
             batch[i] = Arrays.asList(data);
         }

Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml	2011-10-17 03:12:03 UTC (rev 3553)
@@ -168,8 +168,8 @@
     <section>
         <title>tmp/teiid</title>
         <para>This directory contains temporary files created by Teiid.  These are mostly created by the buffer manager.  
-        These files are not needed across a VM restart.  Heavy usage of large data sets will create lots of temporary files.
-        In heavy usage scenarios, consider pointing the buffer directory at a partition that is routinely defragmented.
+        These files are not needed across a VM restart.  Creation of Teiid lob values (for example through SQL/XML) will typically create one file per lob once it exceeds the allowable
+        in memory size of 8KB.  In heavy usage scenarios, consider pointing the buffer directory at a partition that is routinely defragmented.
         </para>
     </section>
          

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -22,56 +22,34 @@
 
 package org.teiid.common.buffer;
 
-public class BaseCacheEntry implements Comparable<BaseCacheEntry> {
+public class BaseCacheEntry {
 
-	private Long id;
-	protected float lastAccess;
-	protected float orderingValue;
-	
-	public BaseCacheEntry(Long id) {
-		this.id = id;
+	private CacheKey key;
+
+	public BaseCacheEntry(CacheKey key) {
+		this.key = key;
 	}
 	
 	public Long getId() {
-		return id;
+		return key.getId();
 	}
 
 	@Override
 	public int hashCode() {
-		return getId().hashCode();
+		return key.hashCode();
 	}
 	
 	@Override
 	public String toString() {
-		return getId().toString();
+		return key.toString();
 	}
 
-	public float getLastAccess() {
-		return lastAccess;
+	public void setKey(CacheKey key) {
+		this.key = key;
 	}
 	
-	public void setLastAccess(float lastAccess) {
-		this.lastAccess = lastAccess;
+	public CacheKey getKey() {
+		return key;
 	}
-	
-	public float getOrderingValue() {
-		return orderingValue;
-	}
-	
-	public void setOrderingValue(float orderingValue) {
-		this.orderingValue = orderingValue;
-	}
-	
-	@Override
-	public int compareTo(BaseCacheEntry o) {
-		int result = (int) Math.signum(orderingValue - o.orderingValue);
-		if (result == 0) {
-			result = (int)Math.signum(lastAccess - o.lastAccess);
-			if (result == 0) {
-				return Long.signum(id - o.id);
-			}
-		}
-		return result;
-	}
 
 }
\ No newline at end of file

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -82,7 +82,7 @@
 	 * @param s
 	 * @throws Exception
 	 */
-	void add(CacheEntry entry, Serializer<?> s) throws Exception;
+	boolean add(CacheEntry entry, Serializer<?> s) throws Exception;
 	
 	/**
 	 * Remove an entry from the cache

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -30,10 +30,14 @@
 	private int sizeEstimate;
 	private WeakReference<? extends Serializer<?>> serializer;
 	
-	public CacheEntry(Long id) {
-		super(id);
+	public CacheEntry(Long oid) {
+		super(new CacheKey(oid, 0, 0));
 	}
 	
+	public CacheEntry(CacheKey key) {
+		super(key);
+	}
+	
 	public int getSizeEstimate() {
 		return sizeEstimate;
 	}
@@ -42,17 +46,6 @@
 		this.sizeEstimate = sizeEstimate;
 	}
 		
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-		if (!(obj instanceof CacheEntry)) {
-			return false;
-		}
-		return getId().equals(((CacheEntry)obj).getId());
-	}
-
-	
 	public Object nullOut() {
 		Object result = getObject();
 		setObject(null);

Added: trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+public class CacheKey implements Comparable<CacheKey> {
+
+	private Long id;
+	protected float lastAccess;
+	protected float orderingValue;
+	
+	public CacheKey(Long id, float lastAccess, float orderingValue) {
+		this.id = id;
+		this.lastAccess = lastAccess;
+		this.orderingValue = orderingValue;
+	}
+	
+	public Long getId() {
+		return id;
+	}
+
+	@Override
+	public int hashCode() {
+		return id.hashCode();
+	}
+	
+	@Override
+	public String toString() {
+		return id.toString();
+	}
+	
+	@Override
+	public boolean equals(Object obj) {
+		if (obj == this) {
+			return true;
+		}
+		if (!(obj instanceof CacheKey)) {
+			return false;
+		}
+		return this.id.equals(((CacheKey)obj).getId());
+	}
+
+	public float getLastAccess() {
+		return lastAccess;
+	}
+	
+	public float getOrderingValue() {
+		return orderingValue;
+	}
+	
+	@Override
+	public int compareTo(CacheKey o) {
+		int result = (int) Math.signum(orderingValue - o.orderingValue);
+		if (result == 0) {
+			result = (int)Math.signum(lastAccess - o.lastAccess);
+			if (result == 0) {
+				return Long.signum(id - o.id);
+			}
+		}
+		return result;
+	}
+
+}
\ No newline at end of file


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -45,6 +45,7 @@
 import org.teiid.common.buffer.BaseCacheEntry;
 import org.teiid.common.buffer.Cache;
 import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.CacheKey;
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.Serializer;
 import org.teiid.common.buffer.StorageManager;
@@ -92,6 +93,10 @@
  * we should at least reclaim tail space if the end block is removed.  for now we are just relying
  * on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of the
  * files.
+ * 
+ * The locking is as fine grained as possible to prevent contention.  See {@link PhysicalInfo} for
+ * flags that are used when it is used as a lock.  It is important to not access the
+ * group maps when a {@link PhysicalInfo} lock is held.
  */
 public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>, StorageManager {
 	
@@ -107,7 +112,8 @@
 	static final int DIRECT_POINTERS = 14;
 	static final int EMPTY_ADDRESS = -1;
 	
-	//TODO allow the block size to be configurable
+	//TODO allow the block size to be configurable. 8k is a reasonable default up to a gig, but we could be more efficient with larger blocks from there.
+	//the rationale for a smaller block size is to reduce internal fragmentation, which is critical when maintaining a relatively small buffer < 256MB
 	static final int LOG_BLOCK_SIZE = 13;
 
 	public static final long MAX_ADDRESSABLE_MEMORY = 1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE);
@@ -151,7 +157,7 @@
 				
 		private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
 			if (index >= MAX_DOUBLE_INDIRECT || (mode == Mode.ALLOCATE && index >= maxMemoryBlocks)) {
-				throw new TeiidRuntimeException("Max block number exceeded"); //$NON-NLS-1$
+				throw new TeiidRuntimeException("Max block number exceeded.  Increase the maxStorageObjectSize to support larger storage objects.  Alternatively you could make the processor batch size smaller."); //$NON-NLS-1$
 			}
 			int dataBlock = 0;
 			int position = 0;
@@ -335,7 +341,7 @@
 	
 	private int maxMemoryBlocks;
 	private AtomicLong readAttempts = new AtomicLong();
-	private OrderedCache<Long, PhysicalInfo> memoryBufferEntries = new OrderedCache<Long, PhysicalInfo>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL, readAttempts);
+	LrfuEvictionQueue<PhysicalInfo> memoryBufferEntries = new LrfuEvictionQueue<PhysicalInfo>(readAttempts);
 	private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
 	private ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock(true);
 	
@@ -351,7 +357,7 @@
 	private BlockStore[] sizeBasedStores;
 	
 	private AtomicBoolean cleanerRunning = new AtomicBoolean();
-	private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, 0, "FileStore Worker"); //$NON-NLS-1$
+	private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, "FileStore Worker"); //$NON-NLS-1$
 	private final Runnable cleaningTask = new Runnable() {
 		
 		@Override
@@ -410,7 +416,7 @@
 	
 	boolean lowBlocks(boolean critical) {
 		int bitsSet = blocksInuse.getBitsSet();
-		return bitsSet > 0 && (blocks - bitsSet < (critical?criticalCleaningThreshold:cleaningThreshold)) && memoryBufferEntries.firstEntry() != null;
+		return bitsSet > 0 && (blocks - bitsSet < (critical?criticalCleaningThreshold:cleaningThreshold)) && memoryBufferEntries.firstEntry(false) != null;
 	}
 	
 	InodeBlockManager getBlockManager(long gid, long oid, int inode) {
@@ -419,10 +425,11 @@
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public void add(CacheEntry entry, Serializer s) {
+	public boolean add(CacheEntry entry, Serializer s) {
 		if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 			LogManager.logDetail(LogConstants.CTX_DQP, "adding object", s.getId(), entry.getId()); //$NON-NLS-1$
 		}
+		boolean newEntry = false;
 		InodeBlockManager blockManager = null;
 		boolean hasPermit = false;
 		PhysicalInfo info = null;
@@ -430,22 +437,32 @@
 		try {
 			Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
 			if (map == null) {
-				return; //already removed
+				return true; //already removed
 			}
 			info = map.get(entry.getId());
 			if (info == null) {
-				if (!map.containsKey(entry.getId())) {
-					return; //already removed
+				synchronized (map) {
+					info = map.get(entry.getId());
+					if (info == null) {
+						newEntry = true;
+						if (!map.containsKey(entry.getId())) {
+							return true; //already removed
+						}
+						info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS);
+						map.put(entry.getId(), info);
+					}
 				}
-			} else {
+			}
+			if (!newEntry) {
 				synchronized (info) {
-					//we assume that serialization would be faster than a disk read
-					if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(0, info)) {
-						success = true;
-						return; 
+					if (info.inode == EMPTY_ADDRESS && info.block == EMPTY_ADDRESS) {
+						return false; //someone else is responsible for adding this cache entry
 					}
-					//we should not be in memory since there is no inode assigned
-					assert !memoryBufferEntries.getEvictionQueue().containsKey(info);
+					if (info.evicting || info.inode != EMPTY_ADDRESS
+							|| !shouldPlaceInMemoryBuffer(0, info)) {
+						return true; //safe to remove from tier 1 
+					}
+					//second chance re-add to the cache, we assume that serialization would be faster than a disk read
 				}
 			}
 			//proactively create freespace
@@ -465,21 +482,18 @@
 			memoryWritePermits.acquire();
 			hasPermit = true;
 			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
-			ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
-            ObjectOutputStream oos = new ObjectOutputStream(fsos);
+			BlockOutputStream bos = new BlockOutputStream(blockManager);
+            ObjectOutputStream oos = new ObjectOutputStream(bos);
             oos.writeInt(entry.getSizeEstimate());
             s.serialize(entry.getObject(), oos);
             oos.close();
+        	//synchronized to ensure proper cleanup from a concurrent removal 
             synchronized (map) {
-            	//synchronize to ensure proper cleanup from a concurrent removal 
             	if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
-            		if (info == null) {
-	           			info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(), fsos.getBytesWritten());
-		                map.put(entry.getId(), info);
-            		}
         			synchronized (info) {
             			info.inode = blockManager.getInode();
-                		memoryBufferEntries.put(entry.getId(), info);
+            			info.setSize(bos.getBytesWritten());
+                		memoryBufferEntries.touch(info, newEntry);
 					}
             		success = true;
             	}
@@ -494,6 +508,7 @@
 				blockManager.free(false);
 			}
 		}
+        return true;
 	}
 	
 	@Override
@@ -541,16 +556,17 @@
 		boolean inStorage = false;
 		try {
 			synchronized (info) {
+				await(info, true, false);
 				if (info.inode != EMPTY_ADDRESS) {
 					info.pinned = true;
-					PhysicalInfo existing = memoryBufferEntries.get(info.getId()); //touch this entry
-					assert existing == info;
+					memoryBufferEntries.touch(info, false); 
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
 						LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
 					}
 					BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
 					is = new BlockInputStream(manager, info.memoryBlockCount);
 				} else if (info.block != EMPTY_ADDRESS) {
+					assert !info.pinned;
 					inStorage = true;
 					storageReads.incrementAndGet();
 					if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
@@ -574,7 +590,7 @@
 		            synchronized (info) {
 			            info.inode = manager.getInode();
 			            info.pinned = true;
-						memoryBufferEntries.put(info.getId(), info);
+						memoryBufferEntries.touch(info, false);
 					}
 					is = new BlockInputStream(manager, info.memoryBlockCount);
 					success = true;
@@ -582,15 +598,15 @@
 					this.memoryWritePermits.release();
 					if (!success && manager != null) {
 						manager.free(false);
-						info.inode = EMPTY_ADDRESS;
+						synchronized (info) {
+							info.inode = EMPTY_ADDRESS;
+						}
 					}
 				}
 			}
-			CacheEntry ce = new CacheEntry(oid);
+			CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1));
 			ObjectInputStream ois = new ObjectInputStream(is);
 			ce.setSizeEstimate(ois.readInt());
-			ce.setLastAccess(1);
-			ce.setOrderingValue(1);
 			ce.setObject(serializer.deserialize(ois));
 			ce.setPersistent(true);
 			return ce;
@@ -614,10 +630,11 @@
 	 * @return
 	 */
 	private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
-		Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
+		PhysicalInfo lowest = memoryBufferEntries.firstEntry(false);
+		CacheKey key = info.getKey();
 		return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) > (criticalCleaningThreshold + info.memoryBlockCount)
-				|| (lowest != null && lowest.getKey().block != EMPTY_ADDRESS 
-						&& lowest.getKey().getOrderingValue() < (currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime, info.getLastAccess(), info.getOrderingValue()):info.getOrderingValue()));
+				|| (lowest != null && lowest.block != EMPTY_ADDRESS 
+						&& lowest.getKey().getOrderingValue() < (currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime, key.getLastAccess(), key.getOrderingValue()):key.getOrderingValue()));
 	}
 	
 	@Override
@@ -650,7 +667,7 @@
 			return;
 		}
 		PhysicalInfo info = map.remove(id);
-		free(id, info, false, false);
+		free(info, false, false);
 	}
 
 	@Override
@@ -661,7 +678,7 @@
 		}
 		synchronized (map) {
 			for (Map.Entry<Long, PhysicalInfo> entry : map.entrySet()) {
-				free(entry.getKey(), entry.getValue(), false, false);
+				free(entry.getValue(), false, false);
 			}
 			return map.keySet();
 		}
@@ -673,28 +690,39 @@
 	 * demote && acquireDataBlock -> push out of memory and reuse a datablock
 	 * !demote -> full removal from memory and disk
 	 */
-	int free(Long oid, PhysicalInfo info, boolean demote, boolean acquireDataBlock) {
+	int free(PhysicalInfo info, boolean demote, boolean acquireDataBlock) {
 		if (info == null) {
 			return EMPTY_ADDRESS;
 		}
+		Long oid = info.getId();
 		int result = EMPTY_ADDRESS;
 		BlockManager bm = null;
 		int block = EMPTY_ADDRESS;
-		try {
-			int memoryBlockCount;
-			int sizeIndex;
-			synchronized (info) {
+		int memoryBlockCount;
+		int sizeIndex;
+		synchronized (info) {
+			//if we're a demotion then the free flag was already checked and set 
+			if (!demote) {
+				//let a pending free finish - it would be nice if we could pre-empt
+				//since we can save some work, but this should be rare enough 
+				//to just block
+				await(info, false, true);
 				info.evicting = true;
-				block = info.block;
-				memoryBlockCount = info.memoryBlockCount;
-				sizeIndex = info.sizeIndex;
-				if (info.inode != EMPTY_ADDRESS) {
-					bm = getBlockManager(info.gid, oid, info.inode);
-				} else if (demote) {
-					return EMPTY_ADDRESS;
-				}
-				//release the lock to perform the transfer
+			} else {
+				assert info.evicting;
 			}
+			block = info.block;
+			memoryBlockCount = info.memoryBlockCount;
+			sizeIndex = info.sizeIndex;
+			if (info.inode != EMPTY_ADDRESS) {
+				bm = getBlockManager(info.gid, oid, info.inode);
+			} else if (demote) {
+				return EMPTY_ADDRESS;
+			}
+			//release the lock to perform the transfer
+			//for straight removals this is a little wasteful
+		}
+		try {
 			if (demote && block == EMPTY_ADDRESS) {
 				storageWrites.getAndIncrement();
 				BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
@@ -724,8 +752,11 @@
 				await(info, true, false);
 				info.evicting = false;
 				info.notifyAll();
-				info.inode = EMPTY_ADDRESS;
-				memoryBufferEntries.remove(info.getId());
+				assert bm == null || info.inode != EMPTY_ADDRESS;
+				if (info.inode != EMPTY_ADDRESS) {
+					info.inode = EMPTY_ADDRESS;
+					memoryBufferEntries.remove(info);
+				}
 				if (block != EMPTY_ADDRESS) {
 					if (demote) {
 						info.block = block;
@@ -762,9 +793,8 @@
 	}
 	
 	/**
-	 * Eviction routine.  When space is exhausted datablocks are stolen from
-	 * memory entries
-	 * starvation.
+	 * Eviction routine.  When space is exhausted data blocks are acquired from
+	 * memory entries.
 	 * @param acquire
 	 * @return
 	 */
@@ -776,10 +806,9 @@
 				//doing a cleanup may trigger the purging of resources
 				AutoCleanupUtil.doCleanup();
 				//scan the eviction queue looking for a victim
-				Iterator<Map.Entry<PhysicalInfo, Long>> iter = memoryBufferEntries.getEvictionQueue().entrySet().iterator();
+				Iterator<PhysicalInfo> iter = memoryBufferEntries.getEvictionQueue().iterator();
 				while (((!acquire && lowBlocks(false)) || (acquire && (next = blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
-					Map.Entry<PhysicalInfo, Long> entry = iter.next();
-					PhysicalInfo info = entry.getKey();
+					PhysicalInfo info = iter.next();
 					synchronized (info) {
 						if (info.inode == EMPTY_ADDRESS) {
 							continue;
@@ -803,7 +832,7 @@
 						//mark as evicting early so that other evictFromMemoryCalls don't select this same entry
 						info.evicting = true;
 					}
-					next = free(entry.getValue(), info, true, acquire);
+					next = free(info, true, acquire);
 					if (!acquire) {
 						next = 0; //let the cleaner know that we made progress
 					}
@@ -853,18 +882,22 @@
 		return storageWrites.get();
 	}
 	
+	public long getMemoryBufferSpace() {
+		return memoryBufferSpace;
+	}
+
 }
 
 /**
  * Represents the memory buffer and storage state of an object.
  * It is important to minimize the amount of data held here.
- * Currently should be 40 bytes.
+ * Currently should be 48 bytes.
  */
 final class PhysicalInfo extends BaseCacheEntry {
 	final Long gid;
 	//the memory inode and block count
 	int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
-	final int memoryBlockCount;
+	int memoryBlockCount;
 	//the storage block and BlockStore index
 	int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
 	byte sizeIndex = 0;
@@ -873,10 +906,13 @@
 	boolean evicting; //indicates that the entry will be moved out of the memory buffer
 	boolean loading; //used by tier 1 cache to prevent double loads
 	
-	public PhysicalInfo(Long gid, Long id, int inode, int size) {
-		super(id);
+	public PhysicalInfo(Long gid, Long id, int inode) {
+		super(new CacheKey(id, 0, 0));
 		this.inode = inode;
 		this.gid = gid;
+	}
+	
+	public void setSize(int size) {
 		this.memoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
 		int blocks = memoryBlockCount;
 		while (blocks >= 1) {
@@ -884,4 +920,5 @@
 			blocks>>=2;
 		}
 	}
+	
 }

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -51,6 +51,7 @@
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.Cache;
 import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.CacheKey;
 import org.teiid.common.buffer.FileStore;
 import org.teiid.common.buffer.LobManager;
 import org.teiid.common.buffer.STree;
@@ -120,7 +121,7 @@
 		
 		@Override
 		public void setPrefersMemory(boolean prefers) {
-			//TODO: it's only expected to move from not preferring to prefefring
+			//TODO: it's only expected to move from not preferring to preferring
 			this.prefersMemory.set(prefers);
 		}
 		
@@ -140,10 +141,6 @@
 				throws TeiidComponentException {
 			int sizeEstimate = getSizeEstimate(batch);
 			Long oid = batchAdded.getAndIncrement();
-			CacheEntry ce = new CacheEntry(oid);
-			ce.setObject(batch);
-			ce.setSizeEstimate(sizeEstimate);
-			ce.setSerializer(this.ref);
 			CacheEntry old = null;
 			if (previous != null) {
 				if (removeOld) {
@@ -152,11 +149,16 @@
 					old = fastGet(previous, prefersMemory.get(), true);
 				}
 			}
+			CacheKey key = new CacheKey(oid, 0, old!=null?old.getKey().getOrderingValue():0);
+			CacheEntry ce = new CacheEntry(key);
+			ce.setObject(batch);
+			ce.setSizeEstimate(sizeEstimate);
+			ce.setSerializer(this.ref);
 			if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
 				LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
 			}
 			cache.addToCacheGroup(id, ce.getId());
-			addMemoryEntry(ce, old);
+			addMemoryEntry(ce);
 			return oid;
 		}
 
@@ -234,7 +236,7 @@
 				ce.setSerializer(this.ref);
 				ce.setPersistent(true);
 				if (retain) {
-					addMemoryEntry(ce, null);
+					addMemoryEntry(ce);
 				}
 			} finally {
 				cache.unlockForLoad(o);
@@ -285,7 +287,6 @@
     private AtomicInteger maxReserveKB = new AtomicInteger(1 << 18);
     private volatile int reserveBatchKB;
     private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a hint to set the reserveBatchKB
-    private long memoryBufferSpace; //used as a hint to account for batch overhead (only useful in large scenarios)
     private boolean useWeakReferences = true;
     private boolean inlineLobs = true;
     private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
@@ -297,10 +298,9 @@
     private AtomicInteger activeBatchKB = new AtomicInteger();
     
     private AtomicLong readAttempts = new AtomicLong();
-    //implements a LRFU cache using the a customized crf function.  we store the value with
-    //the cache entry to make a better decision about reuse of the batch
     //TODO: consider the size estimate in the weighting function
-    private OrderedCache<Long, CacheEntry> memoryEntries = new OrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL, readAttempts);
+    LrfuEvictionQueue<CacheEntry> evictionQueue = new LrfuEvictionQueue<CacheEntry>(readAttempts);
+    ConcurrentHashMap<Long, CacheEntry> memoryEntries = new ConcurrentHashMap<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL);
     
     //limited size reference caches based upon the memory settings
     private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; 
@@ -423,7 +423,7 @@
     	if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
     		LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating STree:", newID); //$NON-NLS-1$
     	}
-    	return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(elements), getProcessorBatchSize(elements.subList(0, keyLength)), keyLength, lobManager);
+    	return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes), getProcessorBatchSize(elements.subList(0, keyLength)), getProcessorBatchSize(elements), keyLength, lobManager);
     }
 
 	private static Class<?>[] getTypeClasses(final List elements) {
@@ -464,10 +464,6 @@
 		this.maxActivePlans = maxActivePlans;
 	}
     
-    public void setMemoryBufferSpace(long memoryBufferSpace) {
-		this.memoryBufferSpace = memoryBufferSpace;
-	}
-    
     public void setMaxProcessingKB(int maxProcessingKB) {
 		this.maxProcessingKB = maxProcessingKB;
 	}
@@ -488,8 +484,6 @@
 				this.maxReserveKB.addAndGet(((int)Math.max(0, (maxMemory - one_gig) * .75)));
 			}
 			this.maxReserveKB.addAndGet(((int)Math.max(0, Math.min(one_gig, maxMemory) * .5)));
-			int batchOverheadKB = (int)(this.memoryBufferSpace<0?(this.maxReserveKB.get()<<8):this.memoryBufferSpace)>>20;
-    		this.maxReserveKB.set(Math.max(0, this.maxReserveKB.get() - batchOverheadKB));
     	}
 		this.reserveBatchKB = this.getMaxReserveKB();
 		if (this.maxProcessingKBOrig == null) {
@@ -574,26 +568,34 @@
 		int maxToFree = Math.max(maxProcessingKB>>1, reserveBatchKB>>3);
 		int freed = 0;
 		while (freed <= maxToFree && activeBatchKB.get() > reserveBatchKB * .8) {
-			CacheEntry ce = memoryEntries.evict();
+			CacheEntry ce = evictionQueue.firstEntry(true);
 			if (ce == null) {
 				break;
 			}
-			freed += ce.getSizeEstimate();
-			activeBatchKB.addAndGet(-ce.getSizeEstimate());
+			if (!memoryEntries.containsKey(ce.getId())) {
+				continue; //not currently a valid eviction
+			}
+			boolean evicted = true;
 			try {
-				evict(ce);
+				evicted = evict(ce);
 			} catch (Throwable e) {
 				LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ ce.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
 			} finally {
-				this.memoryEntries.finishedEviction(ce.getId());
+				synchronized (ce) {
+					if (evicted && memoryEntries.remove(ce.getId()) != null) {
+						freed += ce.getSizeEstimate();
+						activeBatchKB.addAndGet(-ce.getSizeEstimate());
+						evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
+					}
+				}
 			}
 		}
 	}
 
-	void evict(CacheEntry ce) throws Exception {
+	boolean evict(CacheEntry ce) throws Exception {
 		Serializer<?> s = ce.getSerializer();
 		if (s == null) {
-			return;
+			return true;
 		}
 		boolean persist = false;
 		synchronized (ce) {
@@ -608,12 +610,13 @@
 				LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to storage, total writes: ", count); //$NON-NLS-1$
 			}
 		}
-		cache.add(ce, s);
+		boolean result = cache.add(ce, s);
 		if (s.useSoftCache()) {
 			createSoftReference(ce);
 		} else if (useWeakReferences) {
 			weakReferenceCache.getValue(ce); //a get will set the value
 		}
+		return result;
 	}
 
 	private void createSoftReference(CacheEntry ce) {
@@ -633,6 +636,17 @@
 			ce = memoryEntries.remove(batch);
 		}
 		if (ce != null) {
+			synchronized (ce) {
+				if (retain) {
+					//there is a minute chance the batch was evicted
+					//this call ensures that we won't leak
+					if (memoryEntries.containsKey(batch)) {
+						evictionQueue.touch(ce, false);
+					}
+				} else {
+					evictionQueue.remove(ce);
+				}
+			}
 			if (!retain) {
 				BufferManagerImpl.this.remove(ce, true);
 			}
@@ -655,7 +669,7 @@
 		if (ce != null && ce.getObject() != null) {
 			referenceHit.getAndIncrement();
 			if (retain) {
-				addMemoryEntry(ce, null);
+				addMemoryEntry(ce);
 			} else {
 				BufferManagerImpl.this.remove(ce, false);
 			}
@@ -687,13 +701,11 @@
 		}
 	}
 	
-	void addMemoryEntry(CacheEntry ce, CacheEntry previous) {
+	void addMemoryEntry(CacheEntry ce) {
 		persistBatchReferences();
 		synchronized (ce) {
-			if (previous != null) {
-				ce.setOrderingValue(previous.getOrderingValue());
-			}
 			memoryEntries.put(ce.getId(), ce);
+			evictionQueue.touch(ce, true);
 		}
 		activeBatchKB.getAndAdd(ce.getSizeEstimate());
 	}

Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	                        (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership.  Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ * 
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ * 
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.teiid.common.buffer.BaseCacheEntry;
+import org.teiid.common.buffer.CacheKey;
+
+/**
+ * A Concurrent LRFU eviction queue.  Has assumptions that match buffermanager usage.
+ * Null values are not allowed.
+ * @param <K>
+ * @param <V>
+ */
+public class LrfuEvictionQueue<V extends BaseCacheEntry> {
+	
+	//TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
+	//the level function limits the effective map size to ~ 2^16
+	//above which it performs comparably under multi-threaded load to a synchronized LinkedHashMap
+	//just with more CPU overhead vs. wait time.
+	protected NavigableMap<CacheKey, V> evictionQueue = new ConcurrentSkipListMap<CacheKey, V>();
+	protected AtomicLong clock;
+    //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher -> LRU
+    //TODO: adaptively adjust this value.  more hits should move closer to lru
+	protected float crfLamda = .0002f;
+	
+	public LrfuEvictionQueue(AtomicLong clock) {
+		this.clock = clock;
+	}
+
+	public boolean remove(V value) {
+		return evictionQueue.remove(value.getKey()) != null;
+	}
+	
+	public void touch(V value, boolean initial) {
+		if (!initial) {
+			initial = evictionQueue.remove(value.getKey()) == null;			
+		}
+		recordAccess(value, initial);
+		evictionQueue.put(value.getKey(), value);
+	}
+		
+	public Collection<V> getEvictionQueue() {
+		return evictionQueue.values();
+	}
+	
+	public V firstEntry(boolean poll) {
+		Map.Entry<CacheKey, V> entry = null;
+		if (poll) {
+			entry = evictionQueue.pollFirstEntry();
+		} else {
+			entry = evictionQueue.firstEntry();
+		}
+		if (entry != null) {
+			return entry.getValue();
+		}
+		return null;
+	}
+	
+	protected void recordAccess(V value, boolean initial) {
+		assert Thread.holdsLock(value);
+		CacheKey key = value.getKey();
+		float lastAccess = key.getLastAccess();
+		float currentClock = clock.get();
+		if (initial && lastAccess == 0) {
+			return; //we just want to timestamp this as created and not give it an ordering value
+		}
+		float orderingValue = key.getOrderingValue();
+		orderingValue = computeNextOrderingValue(currentClock, lastAccess,
+				orderingValue);
+		value.setKey(new CacheKey(key.getId(), currentClock, orderingValue));
+	}
+
+	float computeNextOrderingValue(float currentTime,
+			float lastAccess, float orderingValue) {
+		orderingValue = 
+			(float) (//Frequency component
+			orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
+			//recency component
+			+ Math.pow(currentTime, crfLamda));
+		return orderingValue;
+	}
+	
+	public float getCrfLamda() {
+		return crfLamda;
+	}
+	
+	public void setCrfLamda(float crfLamda) {
+		this.crfLamda = crfLamda;
+	}
+	
+}


Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -111,11 +111,12 @@
 	}
 	
 	@Override
-	public void add(CacheEntry entry, Serializer<?> s) {
+	public boolean add(CacheEntry entry, Serializer<?> s) {
 		Map<Long, CacheEntry> group = groups.get(s.getId());
 		if (group != null) {
 			group.put(entry.getId(), entry);
 		}
+		return true;
 	}
 	
 	@Override

Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -1,153 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.teiid.common.buffer.BaseCacheEntry;
-
-/**
- * A Concurrent LRFU cache.  Has assumptions that match buffermanager usage.
- * Null values are not allowed.
- * @param <K>
- * @param <V>
- */
-public class OrderedCache<K, V extends BaseCacheEntry> {
-	
-	protected Map<K, V> map; 
-	//TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
-	//the level limits the effective map size to ~ 2^16
-	//above which it performs comparably under load to a synchronized LinkedHashMap
-	//just with more CPU overhead vs. wait time.
-	protected NavigableMap<V, K> evictionQueue = new ConcurrentSkipListMap<V, K>();
-	protected Map<K, V> limbo;
-	protected AtomicLong clock;
-    //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher -> LRU
-    //TODO: adaptively adjust this value.  more hits should move closer to lru
-	protected float crfLamda = .0002f;
-	
-	public OrderedCache(int initialCapacity, float loadFactor, int concurrencyLevel, AtomicLong clock) {
-		map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
-		limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor, concurrencyLevel);
-		this.clock = clock;
-	}
-		
-	public V get(K key) {
-		V result = map.get(key);
-		if (result == null) {
-			result = limbo.get(key);
-		}
-		if (result != null) {
-			synchronized (result) {
-				evictionQueue.remove(result);
-				recordAccess(result, false);
-				evictionQueue.put(result, key);
-			}
-		}
-		return result;
-	}
-	
-	public V remove(K key) {
-		V result = map.remove(key);
-		if (result != null) {
-			synchronized (result) {
-				evictionQueue.remove(result);
-			}
-		}
-		return result;
-	}
-	
-	public V put(K key, V value) {
-		V result = map.put(key, value);
-		if (result != null) {
-			synchronized (result) {
-				evictionQueue.remove(result);
-			}
-		}
-		synchronized (value) {
-			recordAccess(value, result == null);
-			evictionQueue.put(value, key);
-		}
-		return result;
-	}
-	
-	public V evict() {
-		Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
-		if (entry == null) {
-			return null;
-		}
-		limbo.put(entry.getValue(), entry.getKey());
-		return map.remove(entry.getValue());
-	}
-	
-	public void finishedEviction(K key) {
-		limbo.remove(key);
-	}
-	
-	public int size() {
-		return map.size();
-	}
-	
-	public Map<V, K> getEvictionQueue() {
-		return evictionQueue;
-	}
-	
-	public Map.Entry<V, K> firstEntry() {
-		return evictionQueue.firstEntry();
-	}
-	
-	protected void recordAccess(BaseCacheEntry value, boolean initial) {
-		float lastAccess = value.getLastAccess();
-		value.setLastAccess(clock.get());
-		if (initial && lastAccess == 0) {
-			return; //we just want to timestamp this as created and not give it an ordering value
-		}
-		float orderingValue = value.getOrderingValue();
-		orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess,
-				orderingValue);
-		value.setOrderingValue(orderingValue);
-	}
-
-	float computeNextOrderingValue(float currentTime,
-			float lastAccess, float orderingValue) {
-		orderingValue = 
-			(float) (//Frequency component
-			orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
-			//recency component
-			+ Math.pow(currentTime, crfLamda));
-		return orderingValue;
-	}
-	
-	public float getCrfLamda() {
-		return crfLamda;
-	}
-	
-	public void setCrfLamda(float crfLamda) {
-		this.crfLamda = crfLamda;
-	}
-	
-}

Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-10-14 17:31:57 UTC (rev 3552)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java	2011-10-17 03:12:03 UTC (rev 3553)
@@ -68,8 +68,9 @@
     private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
     private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
     private boolean inlineLobs = true;
-    private int memoryBufferSpace = -1;
+    private long memoryBufferSpace = -1;
     private int maxStorageObjectSize = BufferFrontedFileStoreCache.DEFAuLT_MAX_OBJECT_SIZE;
+    private boolean memoryBufferOffHeap;
 	private FileStorageManager fsm;
 	
     /**
@@ -93,7 +94,6 @@
             this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
             this.bufferMgr.setMaxReserveKB(this.maxReserveKb);
             this.bufferMgr.setMaxProcessingKB(this.maxProcessingKb);
-            this.bufferMgr.setMemoryBufferSpace(Math.min(BufferFrontedFileStoreCache.MAX_ADDRESSABLE_MEMORY, this.memoryBufferSpace));
             this.bufferMgr.initialize();
             
             // If necessary, add disk storage manager
@@ -110,12 +110,20 @@
                 ssm.setMaxFileSize(maxFileSize);
                 BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
                 fsc.setMaxStorageObjectSize(maxStorageObjectSize);
+                fsc.setDirect(memoryBufferOffHeap);
+                int batchOverheadKB = (int)(this.memoryBufferSpace<0?(this.bufferMgr.getMaxReserveKB()<<8):this.memoryBufferSpace)>>20;
+        		this.bufferMgr.setMaxReserveKB(Math.max(0, this.bufferMgr.getMaxReserveKB() - batchOverheadKB));
                 if (memoryBufferSpace < 0) {
                 	//use approximately 25% of what's set aside for the reserved
-                	fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() << 8);
+                	fsc.setMemoryBufferSpace(((long)this.bufferMgr.getMaxReserveKB()) << 8);
                 } else {
-                	fsc.setMemoryBufferSpace(memoryBufferSpace);
+                	//scale from MB to bytes
+                	fsc.setMemoryBufferSpace(memoryBufferSpace << 20);
                 }
+                if (!memoryBufferOffHeap && this.maxReserveKb < 0) {
+            		//adjust the value
+            		this.bufferMgr.setMaxReserveKB(this.bufferMgr.getMaxReserveKB() - (int)Math.min(this.bufferMgr.getMaxReserveKB(), (fsc.getMemoryBufferSpace()>>10)));
+                }
                 fsc.setStorageManager(ssm);
                 fsc.initialize();
                 this.bufferMgr.setCache(fsc);
@@ -255,20 +263,29 @@
 		return bufferMgr.getReadAttempts();
 	}
 
-    @ManagementProperty(description="Direct memory buffer space used by the buffer manager in MB.  -1 determines the setting automatically from the maxReserveKB (default -1).  This value cannot be smaller than maxStorageObjectSize.")
-    public int getMemoryBufferSpace() {
+    @ManagementProperty(description="Memory buffer space used by the buffer manager in MB.  -1 determines the setting automatically from the maxReserveKB (default -1).  This value cannot be smaller than maxStorageObjectSize.")
+    public long getMemoryBufferSpace() {
 		return memoryBufferSpace;
 	}
-    
+
+    @ManagementProperty(description="The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608).")
     public int getMaxStorageObjectSize() {
 		return maxStorageObjectSize;
 	}
 
-    @ManagementProperty(description="The maximum size of a buffer managed object (typically a table page or a results batch) in bytes (default 8388608).")
-    public void setMemoryBufferSpace(int maxMemoryBufferSpace) {
-		this.memoryBufferSpace = maxMemoryBufferSpace;
+    @ManagementProperty(description="Set to true to hold the memory buffer off-heap. If true you must ensure that the VM can allocate that much direct memory (default false).")
+    public boolean isMemoryBufferOffHeap() {
+		return memoryBufferOffHeap;
 	}
+    
+    public void setMemoryBufferOffHeap(boolean memoryBufferOffHeap) {
+		this.memoryBufferOffHeap = memoryBufferOffHeap;
+	}
 
+    public void setMemoryBufferSpace(int memoryBufferSpace) {
+		this.memoryBufferSpace = memoryBufferSpace;
+	}
+
     public void setMaxStorageObjectSize(int maxStorageObjectSize) {
 		this.maxStorageObjectSize = maxStorageObjectSize;
 	}



More information about the teiid-commits mailing list