Author: shawkins
Date: 2011-10-16 23:12:03 -0400 (Sun, 16 Oct 2011)
New Revision: 3553
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
Removed:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
trunk/client/src/main/java/org/teiid/client/ResultsMessage.java
trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java
trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
Log:
TEIID-1750 correcting locking with an immutable key and updating jboss-beans
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-10-14
17:31:57 UTC (rev 3552)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-10-17
03:12:03 UTC (rev 3553)
@@ -48,6 +48,14 @@
However inline lob values are not supported by pre-7.6 clients, so disable
this
property if using older clients utilizing lobs. (default true) -->
<property name="inlineLobs">true</property>
+ <!-- Memory buffer space used by the buffer manager in MB. -1 determines the
setting automatically from the maxReserveKB (default -1).
+ This value cannot be smaller than maxStorageObjectSize. -->
+ <property name="maxMemoryBufferSpace">-1</property>
+ <!-- Set to true to hold the memory buffer off-heap. If true you must ensure
that the VM can allocate that much direct memory (default false). -->
+ <property name="memoryBufferOffHeap">false</property>
+ <!-- The maximum size of a buffer managed object (typically a table page or a
results batch) in bytes (default 8388608 or 8MB).
+ Setting this value too high will reduce the effectiveness of the memory
buffer.-->
+ <property name="maxStorageObjectSize">8388608</property>
</bean>
<bean name="CacheFactory"
class="org.teiid.cache.jboss.ClusterableCacheFactory">
Modified: trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/BatchSerializer.java 2011-10-14 17:31:57
UTC (rev 3552)
+++ trunk/client/src/main/java/org/teiid/client/BatchSerializer.java 2011-10-17 03:12:03
UTC (rev 3553)
@@ -22,21 +22,28 @@
package org.teiid.client;
+import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.io.ObjectStreamConstants;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TimeZone;
import org.teiid.core.TeiidRuntimeException;
+import org.teiid.core.types.BlobType;
+import org.teiid.core.types.ClobType;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.types.XMLType;
import org.teiid.jdbc.JDBCPlugin;
@@ -67,6 +74,90 @@
serializers.put(DataTypeManager.DefaultDataTypes.TIMESTAMP, new
TimestampColumnSerializer());
}
+ private static final Map<String, ColumnSerializer> version1serializers = new
HashMap<String, ColumnSerializer>(128);
+ static {
+ version1serializers.put(DataTypeManager.DefaultDataTypes.DATE, new
DateColumnSerializer1());
+ version1serializers.put(DataTypeManager.DefaultDataTypes.TIME, new
TimeColumnSerializer1());
+ version1serializers.put(DataTypeManager.DefaultDataTypes.STRING, new
StringColumnSerializer1());
+ version1serializers.put(DataTypeManager.DefaultDataTypes.CLOB, new
ClobColumnSerializer1());
+ version1serializers.put(DataTypeManager.DefaultDataTypes.BLOB, new
BlobColumnSerializer1());
+ version1serializers.put(DataTypeManager.DefaultDataTypes.XML, new
XmlColumnSerializer1());
+ version1serializers.put(DataTypeManager.DefaultDataTypes.NULL, new
NullColumnSerializer1());
+ //TODO: do better with just an object column
+ }
+
+ private static final int MAX_UTF = 0xFFFF/3; //this is greater than the expected max
length of Teiid Strings
+
+ private static class StringColumnSerializer1 extends ColumnSerializer {
+ @Override
+ protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+ String str = (String)obj;
+ if (str.length() <= MAX_UTF) {
+ //skip object serialization if we have a short string
+ out.writeByte(ObjectStreamConstants.TC_STRING);
+ out.writeUTF(str);
+ } else {
+ out.writeByte(ObjectStreamConstants.TC_LONGSTRING);
+ out.writeObject(obj);
+ }
+ }
+
+ @Override
+ protected Object readObject(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ if (in.readByte() == ObjectStreamConstants.TC_STRING) {
+ return in.readUTF();
+ }
+ return super.readObject(in);
+ }
+ }
+
+ private static class NullColumnSerializer1 extends ColumnSerializer {
+ @Override
+ public void writeColumn(ObjectOutput out, int col,
+ List<? extends List<?>> batch) throws IOException {
+ }
+
+ @Override
+ public void readColumn(ObjectInput in, int col,
+ List<List<Object>> batch, byte[] isNull) throws IOException,
+ ClassNotFoundException {
+ }
+ }
+
+ private static class ClobColumnSerializer1 extends ColumnSerializer {
+ protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+ ((Externalizable)obj).writeExternal(out);
+ }
+ protected Object readObject(ObjectInput in) throws IOException,
ClassNotFoundException {
+ ClobType ct = new ClobType();
+ ct.readExternal(in);
+ return ct;
+ }
+ }
+
+ private static class BlobColumnSerializer1 extends ColumnSerializer {
+ protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+ ((Externalizable)obj).writeExternal(out);
+ }
+ protected Object readObject(ObjectInput in) throws IOException,
ClassNotFoundException {
+ BlobType bt = new BlobType();
+ bt.readExternal(in);
+ return bt;
+ }
+ }
+
+ private static class XmlColumnSerializer1 extends ColumnSerializer {
+ protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+ ((Externalizable)obj).writeExternal(out);
+ }
+ protected Object readObject(ObjectInput in) throws IOException,
ClassNotFoundException {
+ XMLType xt = new XMLType();
+ xt.readExternal(in);
+ return xt;
+ }
+ }
+
/**
* Packs the (boolean) information about whether data values in the column are null
* into bytes so that we send ~n/8 instead of n bytes.
@@ -338,6 +429,34 @@
}
}
+ static int DATE_NORMALIZER = 0;
+
+ static {
+ Calendar c = Calendar.getInstance();
+ c.setTimeZone(TimeZone.getTimeZone("GMT")); //$NON-NLS-1$
+ c.set(1900, 0, 1, 0, 0, 0);
+ c.set(Calendar.MILLISECOND, 0);
+ DATE_NORMALIZER = -(int)(c.getTime().getTime()/60000); //support a 32 bit range
starting at this value
+ }
+
+ private static class DateColumnSerializer1 extends ColumnSerializer {
+ protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+ out.writeInt((int)(((java.sql.Date)obj).getTime()/60000) + DATE_NORMALIZER);
+ }
+ protected Object readObject(ObjectInput in) throws IOException {
+ return new java.sql.Date(((in.readInt()&0xffffffffL) -
DATE_NORMALIZER)*60000);
+ }
+ }
+
+ private static class TimeColumnSerializer1 extends ColumnSerializer {
+ protected void writeObject(ObjectOutput out, Object obj) throws IOException {
+ out.writeInt((int)(((Time)obj).getTime()/1000));
+ }
+ protected Object readObject(ObjectInput in) throws IOException {
+ return new Time((in.readInt()&0xffffffffL)*1000);
+ }
+ }
+
private static class TimestampColumnSerializer extends ColumnSerializer {
protected void writeObject(ObjectOutput out, Object obj) throws IOException {
Timestamp ts = (Timestamp)obj;
@@ -351,15 +470,25 @@
}
}
- private static ColumnSerializer getSerializer(String type) {
- ColumnSerializer cs = serializers.get((type == null) ?
DataTypeManager.DefaultDataTypes.OBJECT : type);
+ private static ColumnSerializer getSerializer(String type, byte version) {
+ ColumnSerializer cs = null;
+ if (version == 1) {
+ cs = version1serializers.get((type == null) ?
DataTypeManager.DefaultDataTypes.OBJECT : type);
+ }
+ if (cs == null) {
+ cs = serializers.get((type == null) ? DataTypeManager.DefaultDataTypes.OBJECT :
type);
+ }
if (cs == null) {
return defaultSerializer;
}
return cs;
}
+
+ public static void writeBatch(ObjectOutput out, String[] types, List<? extends
List<?>> batch) throws IOException {
+ writeBatch(out, types, batch, (byte)1);
+ }
- public static void writeBatch(ObjectOutput out, String[] types, List<? extends
List<?>> batch) throws IOException {
+ public static void writeBatch(ObjectOutput out, String[] types, List<? extends
List<?>> batch, byte version) throws IOException {
if (batch == null) {
out.writeInt(-1);
} else {
@@ -368,7 +497,7 @@
int columns = types.length;
out.writeInt(columns);
for(int i = 0; i < columns; i++) {
- ColumnSerializer serializer = getSerializer(types[i]);
+ ColumnSerializer serializer = getSerializer(types[i], version);
try {
serializer.writeColumn(out, i, batch);
} catch (ClassCastException e) {
@@ -389,6 +518,10 @@
}
public static List<List<Object>> readBatch(ObjectInput in, String[]
types) throws IOException, ClassNotFoundException {
+ return readBatch(in, types, (byte)1);
+ }
+
+ public static List<List<Object>> readBatch(ObjectInput in, String[]
types, byte version) throws IOException, ClassNotFoundException {
int rows = in.readInt();
if (rows == 0) {
return new ArrayList<List<Object>>(0);
@@ -402,7 +535,7 @@
}
byte[] isNullBuffer = new byte[(extraRows > 0) ? numBytes + 1: numBytes];
for (int col = 0; col < columns; col++) {
- getSerializer(types[col]).readColumn(in, col, batch, isNullBuffer);
+ getSerializer(types[col], version).readColumn(in, col, batch,
isNullBuffer);
}
return batch;
}
Modified: trunk/client/src/main/java/org/teiid/client/ResultsMessage.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/ResultsMessage.java 2011-10-14 17:31:57
UTC (rev 3552)
+++ trunk/client/src/main/java/org/teiid/client/ResultsMessage.java 2011-10-17 03:12:03
UTC (rev 3553)
@@ -240,7 +240,7 @@
dataTypes = ExternalizeUtil.readStringArray(in);
// Row data
- results = BatchSerializer.readBatch(in, dataTypes);
+ results = BatchSerializer.readBatch(in, dataTypes, (byte)0);
// Plan Descriptions
planDescription = (PlanNode)in.readObject();
@@ -272,7 +272,7 @@
ExternalizeUtil.writeArray(out, dataTypes);
// Results data
- BatchSerializer.writeBatch(out, dataTypes, results);
+ BatchSerializer.writeBatch(out, dataTypes, results, (byte)0);
// Plan descriptions
out.writeObject(this.planDescription);
Modified: trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java
===================================================================
--- trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java 2011-10-14
17:31:57 UTC (rev 3552)
+++ trunk/client/src/test/java/org/teiid/client/TestBatchSerializer.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -29,15 +29,13 @@
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import junit.framework.TestCase;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.TimestampWithTimezone;
@@ -47,22 +45,6 @@
*/
public class TestBatchSerializer extends TestCase {
- private static void assertEqual(List[] expectedBatch, List[] batch) {
- if (expectedBatch == null) {
- assertNull(batch);
- return;
- }
- assertEquals(expectedBatch.length, batch.length);
- if (expectedBatch.length > 0) {
- int columns = expectedBatch[0].size();
- for (int row = 0; row < expectedBatch.length; row++) {
- for (int col = 0; col < columns; col++) {
- assertEquals(expectedBatch[row].get(col), batch[row].get(col));
- }
- }
- }
- }
-
private static void helpTestSerialization(String[] types, List<?>[] batch)
throws IOException, ClassNotFoundException {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteStream);
@@ -108,21 +90,21 @@
List[] batch = new List[rows];
for (int i = 0; i < rows; i++) {
- long currentTime = System.currentTimeMillis();
+ java.util.Date d = new java.util.Date();
Object[] data = { new BigDecimal("" + i), //$NON-NLS-1$
new BigInteger(Integer.toString(i)),
(i%2 == 0) ? Boolean.FALSE: Boolean.TRUE,
new Byte((byte)i),
new Character((char)i),
- new Date(currentTime),
+ TimestampWithTimezone.createDate(d),
new Double(i),
new Float(i),
new Integer(i),
new Long(i),
new Short((short)i),
sampleString(i),
- new Time(currentTime),
- new Timestamp(currentTime)
+ TimestampWithTimezone.createTime(d),
+ TimestampWithTimezone.createTimestamp(d)
};
batch[i] = Arrays.asList(data);
}
@@ -133,22 +115,22 @@
List[] batch = new List[rows];
for (int i = 0; i < rows; i++) {
- long currentTime = System.currentTimeMillis();
+ java.util.Date d = new java.util.Date();
int mod = i%14;
Object[] data = { (mod == 0) ? null : new BigDecimal("" + i),
//$NON-NLS-1$
(mod == 1) ? null : new BigInteger(Integer.toString(i)),
(mod == 2) ? null : ((i%2 == 0) ? Boolean.FALSE:
Boolean.TRUE),
(mod == 3) ? null : new Byte((byte)i),
(mod == 4) ? null : new Character((char)i),
- (mod == 5) ? null : new Date(currentTime),
+ (mod == 5) ? null : TimestampWithTimezone.createDate(d),
(mod == 6) ? null : new Double(i),
(mod == 7) ? null : new Float(i),
(mod == 8) ? null : new Integer(i),
(mod == 9) ? null : new Long(i),
(mod == 10) ? null : new Short((short)i),
(mod == 11) ? null : sampleString(i),
- (mod == 12) ? null : new Time(currentTime),
- (mod == 13) ? null : new Timestamp(currentTime)
+ (mod == 12) ? null : TimestampWithTimezone.createTime(d),
+ (mod == 13) ? null :
TimestampWithTimezone.createTimestamp(d)
};
batch[i] = Arrays.asList(data);
}
Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml
===================================================================
---
trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml 2011-10-14
17:31:57 UTC (rev 3552)
+++
trunk/documentation/admin-guide/src/main/docbook/en-US/content/installation.xml 2011-10-17
03:12:03 UTC (rev 3553)
@@ -168,8 +168,8 @@
<section>
<title>tmp/teiid</title>
<para>This directory contains temporary files created by Teiid. These are
mostly created by the buffer manager.
- These files are not needed across a VM restart. Heavy usage of large data sets
will create lots of temporary files.
- In heavy usage scenarios, consider pointing the buffer directory at a partition
that is routinely defragmented.
+ These files are not needed across a VM restart. Creation of Teiid lob values
(for example through SQL/XML) will typically create one file per lob once it exceeds the
allowable
+ in memory size of 8KB. In heavy usage scenarios, consider pointing the buffer
directory at a partition that is routinely defragmented.
</para>
</section>
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java 2011-10-14
17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BaseCacheEntry.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -22,56 +22,34 @@
package org.teiid.common.buffer;
-public class BaseCacheEntry implements Comparable<BaseCacheEntry> {
+public class BaseCacheEntry {
- private Long id;
- protected float lastAccess;
- protected float orderingValue;
-
- public BaseCacheEntry(Long id) {
- this.id = id;
+ private CacheKey key;
+
+ public BaseCacheEntry(CacheKey key) {
+ this.key = key;
}
public Long getId() {
- return id;
+ return key.getId();
}
@Override
public int hashCode() {
- return getId().hashCode();
+ return key.hashCode();
}
@Override
public String toString() {
- return getId().toString();
+ return key.toString();
}
- public float getLastAccess() {
- return lastAccess;
+ public void setKey(CacheKey key) {
+ this.key = key;
}
- public void setLastAccess(float lastAccess) {
- this.lastAccess = lastAccess;
+ public CacheKey getKey() {
+ return key;
}
-
- public float getOrderingValue() {
- return orderingValue;
- }
-
- public void setOrderingValue(float orderingValue) {
- this.orderingValue = orderingValue;
- }
-
- @Override
- public int compareTo(BaseCacheEntry o) {
- int result = (int) Math.signum(orderingValue - o.orderingValue);
- if (result == 0) {
- result = (int)Math.signum(lastAccess - o.lastAccess);
- if (result == 0) {
- return Long.signum(id - o.id);
- }
- }
- return result;
- }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-10-14 17:31:57 UTC
(rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-10-17 03:12:03 UTC
(rev 3553)
@@ -82,7 +82,7 @@
* @param s
* @throws Exception
*/
- void add(CacheEntry entry, Serializer<?> s) throws Exception;
+ boolean add(CacheEntry entry, Serializer<?> s) throws Exception;
/**
* Remove an entry from the cache
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-10-14 17:31:57
UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheEntry.java 2011-10-17 03:12:03
UTC (rev 3553)
@@ -30,10 +30,14 @@
private int sizeEstimate;
private WeakReference<? extends Serializer<?>> serializer;
- public CacheEntry(Long id) {
- super(id);
+ public CacheEntry(Long oid) {
+ super(new CacheKey(oid, 0, 0));
}
+ public CacheEntry(CacheKey key) {
+ super(key);
+ }
+
public int getSizeEstimate() {
return sizeEstimate;
}
@@ -42,17 +46,6 @@
this.sizeEstimate = sizeEstimate;
}
- public boolean equals(Object obj) {
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof CacheEntry)) {
- return false;
- }
- return getId().equals(((CacheEntry)obj).getId());
- }
-
-
public Object nullOut() {
Object result = getObject();
setObject(null);
Added: trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java 2011-10-17 03:12:03
UTC (rev 3553)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+public class CacheKey implements Comparable<CacheKey> {
+
+ private Long id;
+ protected float lastAccess;
+ protected float orderingValue;
+
+ public CacheKey(Long id, float lastAccess, float orderingValue) {
+ this.id = id;
+ this.lastAccess = lastAccess;
+ this.orderingValue = orderingValue;
+ }
+
+ public Long getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return id.toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof CacheKey)) {
+ return false;
+ }
+ return this.id.equals(((CacheKey)obj).getId());
+ }
+
+ public float getLastAccess() {
+ return lastAccess;
+ }
+
+ public float getOrderingValue() {
+ return orderingValue;
+ }
+
+ @Override
+ public int compareTo(CacheKey o) {
+ int result = (int) Math.signum(orderingValue - o.orderingValue);
+ if (result == 0) {
+ result = (int)Math.signum(lastAccess - o.lastAccess);
+ if (result == 0) {
+ return Long.signum(id - o.id);
+ }
+ }
+ return result;
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/CacheKey.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-14
17:31:57 UTC (rev 3552)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -45,6 +45,7 @@
import org.teiid.common.buffer.BaseCacheEntry;
import org.teiid.common.buffer.Cache;
import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.CacheKey;
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.Serializer;
import org.teiid.common.buffer.StorageManager;
@@ -92,6 +93,10 @@
* we should at least reclaim tail space if the end block is removed. for now we are
just relying
* on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of
the
* files.
+ *
+ * The locking is as fine grained as possible to prevent contention. See {@link
PhysicalInfo} for
+ * flags that are used when it is used as a lock. It is important to not access the
+ * group maps when a {@link PhysicalInfo} lock is held.
*/
public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>,
StorageManager {
@@ -107,7 +112,8 @@
static final int DIRECT_POINTERS = 14;
static final int EMPTY_ADDRESS = -1;
- //TODO allow the block size to be configurable
+ //TODO allow the block size to be configurable. 8k is a reasonable default up to a gig,
but we could be more efficient with larger blocks from there.
+ //the rationale for a smaller block size is to reduce internal fragmentation, which is
critical when maintaining a relatively small buffer < 256MB
static final int LOG_BLOCK_SIZE = 13;
public static final long MAX_ADDRESSABLE_MEMORY =
1l<<(ADDRESS_BITS+LOG_BLOCK_SIZE);
@@ -151,7 +157,7 @@
private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
if (index >= MAX_DOUBLE_INDIRECT || (mode == Mode.ALLOCATE && index >=
maxMemoryBlocks)) {
- throw new TeiidRuntimeException("Max block number exceeded");
//$NON-NLS-1$
+ throw new TeiidRuntimeException("Max block number exceeded. Increase the
maxStorageObjectSize to support larger storage objects. Alternatively you could make the
processor batch size smaller."); //$NON-NLS-1$
}
int dataBlock = 0;
int position = 0;
@@ -335,7 +341,7 @@
private int maxMemoryBlocks;
private AtomicLong readAttempts = new AtomicLong();
- private OrderedCache<Long, PhysicalInfo> memoryBufferEntries = new
OrderedCache<Long, PhysicalInfo>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL,
readAttempts);
+ LrfuEvictionQueue<PhysicalInfo> memoryBufferEntries = new
LrfuEvictionQueue<PhysicalInfo>(readAttempts);
private Semaphore memoryWritePermits; //prevents deadlock waiting for free blocks
private ReentrantReadWriteLock memoryEvictionLock = new ReentrantReadWriteLock(true);
@@ -351,7 +357,7 @@
private BlockStore[] sizeBasedStores;
private AtomicBoolean cleanerRunning = new AtomicBoolean();
- private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, 0,
"FileStore Worker"); //$NON-NLS-1$
+ private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, "FileStore
Worker"); //$NON-NLS-1$
private final Runnable cleaningTask = new Runnable() {
@Override
@@ -410,7 +416,7 @@
boolean lowBlocks(boolean critical) {
int bitsSet = blocksInuse.getBitsSet();
- return bitsSet > 0 && (blocks - bitsSet <
(critical?criticalCleaningThreshold:cleaningThreshold)) &&
memoryBufferEntries.firstEntry() != null;
+ return bitsSet > 0 && (blocks - bitsSet <
(critical?criticalCleaningThreshold:cleaningThreshold)) &&
memoryBufferEntries.firstEntry(false) != null;
}
InodeBlockManager getBlockManager(long gid, long oid, int inode) {
@@ -419,10 +425,11 @@
@SuppressWarnings("unchecked")
@Override
- public void add(CacheEntry entry, Serializer s) {
+ public boolean add(CacheEntry entry, Serializer s) {
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "adding object", s.getId(),
entry.getId()); //$NON-NLS-1$
}
+ boolean newEntry = false;
InodeBlockManager blockManager = null;
boolean hasPermit = false;
PhysicalInfo info = null;
@@ -430,22 +437,32 @@
try {
Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
if (map == null) {
- return; //already removed
+ return true; //already removed
}
info = map.get(entry.getId());
if (info == null) {
- if (!map.containsKey(entry.getId())) {
- return; //already removed
+ synchronized (map) {
+ info = map.get(entry.getId());
+ if (info == null) {
+ newEntry = true;
+ if (!map.containsKey(entry.getId())) {
+ return true; //already removed
+ }
+ info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS);
+ map.put(entry.getId(), info);
+ }
}
- } else {
+ }
+ if (!newEntry) {
synchronized (info) {
- //we assume that serialization would be faster than a disk read
- if (info.inode != EMPTY_ADDRESS || !shouldPlaceInMemoryBuffer(0, info)) {
- success = true;
- return;
+ if (info.inode == EMPTY_ADDRESS && info.block == EMPTY_ADDRESS) {
+ return false; //someone else is responsible for adding this cache entry
}
- //we should not be in memory since there is no inode assigned
- assert !memoryBufferEntries.getEvictionQueue().containsKey(info);
+ if (info.evicting || info.inode != EMPTY_ADDRESS
+ || !shouldPlaceInMemoryBuffer(0, info)) {
+ return true; //safe to remove from tier 1
+ }
+ //second chance re-add to the cache, we assume that serialization would be faster
than a disk read
}
}
//proactively create freespace
@@ -465,21 +482,18 @@
memoryWritePermits.acquire();
hasPermit = true;
blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
- ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ BlockOutputStream bos = new BlockOutputStream(blockManager);
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeInt(entry.getSizeEstimate());
s.serialize(entry.getObject(), oos);
oos.close();
+ //synchronized to ensure proper cleanup from a concurrent removal
synchronized (map) {
- //synchronize to ensure proper cleanup from a concurrent removal
if (physicalMapping.containsKey(s.getId()) &&
map.containsKey(entry.getId())) {
- if (info == null) {
- info = new PhysicalInfo(s.getId(), entry.getId(), blockManager.getInode(),
fsos.getBytesWritten());
- map.put(entry.getId(), info);
- }
synchronized (info) {
info.inode = blockManager.getInode();
- memoryBufferEntries.put(entry.getId(), info);
+ info.setSize(bos.getBytesWritten());
+ memoryBufferEntries.touch(info, newEntry);
}
success = true;
}
@@ -494,6 +508,7 @@
blockManager.free(false);
}
}
+ return true;
}
@Override
@@ -541,16 +556,17 @@
boolean inStorage = false;
try {
synchronized (info) {
+ await(info, true, false);
if (info.inode != EMPTY_ADDRESS) {
info.pinned = true;
- PhysicalInfo existing = memoryBufferEntries.get(info.getId()); //touch this entry
- assert existing == info;
+ memoryBufferEntries.touch(info, false);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_DQP, "Getting object at inode",
info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
is = new BlockInputStream(manager, info.memoryBlockCount);
} else if (info.block != EMPTY_ADDRESS) {
+ assert !info.pinned;
inStorage = true;
storageReads.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.DETAIL)) {
@@ -574,7 +590,7 @@
synchronized (info) {
info.inode = manager.getInode();
info.pinned = true;
- memoryBufferEntries.put(info.getId(), info);
+ memoryBufferEntries.touch(info, false);
}
is = new BlockInputStream(manager, info.memoryBlockCount);
success = true;
@@ -582,15 +598,15 @@
this.memoryWritePermits.release();
if (!success && manager != null) {
manager.free(false);
- info.inode = EMPTY_ADDRESS;
+ synchronized (info) {
+ info.inode = EMPTY_ADDRESS;
+ }
}
}
}
- CacheEntry ce = new CacheEntry(oid);
+ CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1));
ObjectInputStream ois = new ObjectInputStream(is);
ce.setSizeEstimate(ois.readInt());
- ce.setLastAccess(1);
- ce.setOrderingValue(1);
ce.setObject(serializer.deserialize(ois));
ce.setPersistent(true);
return ce;
@@ -614,10 +630,11 @@
* @return
*/
private boolean shouldPlaceInMemoryBuffer(long currentTime, PhysicalInfo info) {
- Map.Entry<PhysicalInfo, Long> lowest = memoryBufferEntries.firstEntry();
+ PhysicalInfo lowest = memoryBufferEntries.firstEntry(false);
+ CacheKey key = info.getKey();
return (blocksInuse.getTotalBits() - blocksInuse.getBitsSet()) >
(criticalCleaningThreshold + info.memoryBlockCount)
- || (lowest != null && lowest.getKey().block != EMPTY_ADDRESS
- && lowest.getKey().getOrderingValue() <
(currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime,
info.getLastAccess(), info.getOrderingValue()):info.getOrderingValue()));
+ || (lowest != null && lowest.block != EMPTY_ADDRESS
+ && lowest.getKey().getOrderingValue() <
(currentTime>0?memoryBufferEntries.computeNextOrderingValue(currentTime,
key.getLastAccess(), key.getOrderingValue()):key.getOrderingValue()));
}
@Override
@@ -650,7 +667,7 @@
return;
}
PhysicalInfo info = map.remove(id);
- free(id, info, false, false);
+ free(info, false, false);
}
@Override
@@ -661,7 +678,7 @@
}
synchronized (map) {
for (Map.Entry<Long, PhysicalInfo> entry : map.entrySet()) {
- free(entry.getKey(), entry.getValue(), false, false);
+ free(entry.getValue(), false, false);
}
return map.keySet();
}
@@ -673,28 +690,39 @@
* demote && acquireDataBlock -> push out of memory and reuse a datablock
* !demote -> full removal from memory and disk
*/
- int free(Long oid, PhysicalInfo info, boolean demote, boolean acquireDataBlock) {
+ int free(PhysicalInfo info, boolean demote, boolean acquireDataBlock) {
if (info == null) {
return EMPTY_ADDRESS;
}
+ Long oid = info.getId();
int result = EMPTY_ADDRESS;
BlockManager bm = null;
int block = EMPTY_ADDRESS;
- try {
- int memoryBlockCount;
- int sizeIndex;
- synchronized (info) {
+ int memoryBlockCount;
+ int sizeIndex;
+ synchronized (info) {
+ //if we're a demotion then the free flag was already checked and set
+ if (!demote) {
+ //let a pending free finish - it would be nice if we could pre-empt
+ //since we can save some work, but this should be rare enough
+ //to just block
+ await(info, false, true);
info.evicting = true;
- block = info.block;
- memoryBlockCount = info.memoryBlockCount;
- sizeIndex = info.sizeIndex;
- if (info.inode != EMPTY_ADDRESS) {
- bm = getBlockManager(info.gid, oid, info.inode);
- } else if (demote) {
- return EMPTY_ADDRESS;
- }
- //release the lock to perform the transfer
+ } else {
+ assert info.evicting;
}
+ block = info.block;
+ memoryBlockCount = info.memoryBlockCount;
+ sizeIndex = info.sizeIndex;
+ if (info.inode != EMPTY_ADDRESS) {
+ bm = getBlockManager(info.gid, oid, info.inode);
+ } else if (demote) {
+ return EMPTY_ADDRESS;
+ }
+ //release the lock to perform the transfer
+ //for straight removals this is a little wasteful
+ }
+ try {
if (demote && block == EMPTY_ADDRESS) {
storageWrites.getAndIncrement();
BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
@@ -724,8 +752,11 @@
await(info, true, false);
info.evicting = false;
info.notifyAll();
- info.inode = EMPTY_ADDRESS;
- memoryBufferEntries.remove(info.getId());
+ assert bm == null || info.inode != EMPTY_ADDRESS;
+ if (info.inode != EMPTY_ADDRESS) {
+ info.inode = EMPTY_ADDRESS;
+ memoryBufferEntries.remove(info);
+ }
if (block != EMPTY_ADDRESS) {
if (demote) {
info.block = block;
@@ -762,9 +793,8 @@
}
/**
- * Eviction routine. When space is exhausted datablocks are stolen from
- * memory entries
- * starvation.
+ * Eviction routine. When space is exhausted data blocks are acquired from
+ * memory entries.
* @param acquire
* @return
*/
@@ -776,10 +806,9 @@
//doing a cleanup may trigger the purging of resources
AutoCleanupUtil.doCleanup();
//scan the eviction queue looking for a victim
- Iterator<Map.Entry<PhysicalInfo, Long>> iter =
memoryBufferEntries.getEvictionQueue().entrySet().iterator();
+ Iterator<PhysicalInfo> iter =
memoryBufferEntries.getEvictionQueue().iterator();
while (((!acquire && lowBlocks(false)) || (acquire && (next =
blocksInuse.getAndSetNextClearBit()) == EMPTY_ADDRESS)) && iter.hasNext()) {
- Map.Entry<PhysicalInfo, Long> entry = iter.next();
- PhysicalInfo info = entry.getKey();
+ PhysicalInfo info = iter.next();
synchronized (info) {
if (info.inode == EMPTY_ADDRESS) {
continue;
@@ -803,7 +832,7 @@
//mark as evicting early so that other evictFromMemoryCalls don't select this
same entry
info.evicting = true;
}
- next = free(entry.getValue(), info, true, acquire);
+ next = free(info, true, acquire);
if (!acquire) {
next = 0; //let the cleaner know that we made progress
}
@@ -853,18 +882,22 @@
return storageWrites.get();
}
+ public long getMemoryBufferSpace() {
+ return memoryBufferSpace;
+ }
+
}
/**
* Represents the memory buffer and storage state of an object.
* It is important to minimize the amount of data held here.
- * Currently should be 40 bytes.
+ * Currently should be 48 bytes.
*/
final class PhysicalInfo extends BaseCacheEntry {
final Long gid;
//the memory inode and block count
int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
- final int memoryBlockCount;
+ int memoryBlockCount;
//the storage block and BlockStore index
int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
byte sizeIndex = 0;
@@ -873,10 +906,13 @@
boolean evicting; //indicates that the entry will be moved out of the memory buffer
boolean loading; //used by tier 1 cache to prevent double loads
- public PhysicalInfo(Long gid, Long id, int inode, int size) {
- super(id);
+ public PhysicalInfo(Long gid, Long id, int inode) {
+ super(new CacheKey(id, 0, 0));
this.inode = inode;
this.gid = gid;
+ }
+
+ public void setSize(int size) {
this.memoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) +
((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
int blocks = memoryBlockCount;
while (blocks >= 1) {
@@ -884,4 +920,5 @@
blocks>>=2;
}
}
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-14
17:31:57 UTC (rev 3552)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -51,6 +51,7 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.Cache;
import org.teiid.common.buffer.CacheEntry;
+import org.teiid.common.buffer.CacheKey;
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.LobManager;
import org.teiid.common.buffer.STree;
@@ -120,7 +121,7 @@
@Override
public void setPrefersMemory(boolean prefers) {
- //TODO: it's only expected to move from not preferring to prefefring
+ //TODO: it's only expected to move from not preferring to preferring
this.prefersMemory.set(prefers);
}
@@ -140,10 +141,6 @@
throws TeiidComponentException {
int sizeEstimate = getSizeEstimate(batch);
Long oid = batchAdded.getAndIncrement();
- CacheEntry ce = new CacheEntry(oid);
- ce.setObject(batch);
- ce.setSizeEstimate(sizeEstimate);
- ce.setSerializer(this.ref);
CacheEntry old = null;
if (previous != null) {
if (removeOld) {
@@ -152,11 +149,16 @@
old = fastGet(previous, prefersMemory.get(), true);
}
}
+ CacheKey key = new CacheKey(oid, 0, old!=null?old.getKey().getOrderingValue():0);
+ CacheEntry ce = new CacheEntry(key);
+ ce.setObject(batch);
+ ce.setSizeEstimate(sizeEstimate);
+ ce.setSerializer(this.ref);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE))
{
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to
BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate());
//$NON-NLS-1$ //$NON-NLS-2$
}
cache.addToCacheGroup(id, ce.getId());
- addMemoryEntry(ce, old);
+ addMemoryEntry(ce);
return oid;
}
@@ -234,7 +236,7 @@
ce.setSerializer(this.ref);
ce.setPersistent(true);
if (retain) {
- addMemoryEntry(ce, null);
+ addMemoryEntry(ce);
}
} finally {
cache.unlockForLoad(o);
@@ -285,7 +287,6 @@
private AtomicInteger maxReserveKB = new AtomicInteger(1 << 18);
private volatile int reserveBatchKB;
private int maxActivePlans = DQPConfiguration.DEFAULT_MAX_ACTIVE_PLANS; //used as a
hint to set the reserveBatchKB
- private long memoryBufferSpace; //used as a hint to account for batch overhead (only
useful in large scenarios)
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
@@ -297,10 +298,9 @@
private AtomicInteger activeBatchKB = new AtomicInteger();
private AtomicLong readAttempts = new AtomicLong();
- //implements a LRFU cache using the a customized crf function. we store the value
with
- //the cache entry to make a better decision about reuse of the batch
//TODO: consider the size estimate in the weighting function
- private OrderedCache<Long, CacheEntry> memoryEntries = new
OrderedCache<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL, readAttempts);
+ LrfuEvictionQueue<CacheEntry> evictionQueue = new
LrfuEvictionQueue<CacheEntry>(readAttempts);
+ ConcurrentHashMap<Long, CacheEntry> memoryEntries = new
ConcurrentHashMap<Long, CacheEntry>(16, .75f, CONCURRENCY_LEVEL);
//limited size reference caches based upon the memory settings
private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache;
@@ -423,7 +423,7 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Creating STree:",
newID); //$NON-NLS-1$
}
- return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes),
getProcessorBatchSize(elements), getProcessorBatchSize(elements.subList(0, keyLength)),
keyLength, lobManager);
+ return new STree(keyManager, bm, new ListNestedSortComparator(compareIndexes),
getProcessorBatchSize(elements.subList(0, keyLength)), getProcessorBatchSize(elements),
keyLength, lobManager);
}
private static Class<?>[] getTypeClasses(final List elements) {
@@ -464,10 +464,6 @@
this.maxActivePlans = maxActivePlans;
}
- public void setMemoryBufferSpace(long memoryBufferSpace) {
- this.memoryBufferSpace = memoryBufferSpace;
- }
-
public void setMaxProcessingKB(int maxProcessingKB) {
this.maxProcessingKB = maxProcessingKB;
}
@@ -488,8 +484,6 @@
this.maxReserveKB.addAndGet(((int)Math.max(0, (maxMemory - one_gig) * .75)));
}
this.maxReserveKB.addAndGet(((int)Math.max(0, Math.min(one_gig, maxMemory) * .5)));
- int batchOverheadKB =
(int)(this.memoryBufferSpace<0?(this.maxReserveKB.get()<<8):this.memoryBufferSpace)>>20;
- this.maxReserveKB.set(Math.max(0, this.maxReserveKB.get() - batchOverheadKB));
}
this.reserveBatchKB = this.getMaxReserveKB();
if (this.maxProcessingKBOrig == null) {
@@ -574,26 +568,34 @@
int maxToFree = Math.max(maxProcessingKB>>1, reserveBatchKB>>3);
int freed = 0;
while (freed <= maxToFree && activeBatchKB.get() > reserveBatchKB * .8)
{
- CacheEntry ce = memoryEntries.evict();
+ CacheEntry ce = evictionQueue.firstEntry(true);
if (ce == null) {
break;
}
- freed += ce.getSizeEstimate();
- activeBatchKB.addAndGet(-ce.getSizeEstimate());
+ if (!memoryEntries.containsKey(ce.getId())) {
+ continue; //not currently a valid eviction
+ }
+ boolean evicted = true;
try {
- evict(ce);
+ evicted = evict(ce);
} catch (Throwable e) {
LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ ce.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
} finally {
- this.memoryEntries.finishedEviction(ce.getId());
+ synchronized (ce) {
+ if (evicted && memoryEntries.remove(ce.getId()) != null) {
+ freed += ce.getSizeEstimate();
+ activeBatchKB.addAndGet(-ce.getSizeEstimate());
+ evictionQueue.remove(ce); //ensures that an intervening get will still be cleaned
+ }
+ }
}
}
}
- void evict(CacheEntry ce) throws Exception {
+ boolean evict(CacheEntry ce) throws Exception {
Serializer<?> s = ce.getSerializer();
if (s == null) {
- return;
+ return true;
}
boolean persist = false;
synchronized (ce) {
@@ -608,12 +610,13 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, ce.getId(), "writing batch to
storage, total writes: ", count); //$NON-NLS-1$
}
}
- cache.add(ce, s);
+ boolean result = cache.add(ce, s);
if (s.useSoftCache()) {
createSoftReference(ce);
} else if (useWeakReferences) {
weakReferenceCache.getValue(ce); //a get will set the value
}
+ return result;
}
private void createSoftReference(CacheEntry ce) {
@@ -633,6 +636,17 @@
ce = memoryEntries.remove(batch);
}
if (ce != null) {
+ synchronized (ce) {
+ if (retain) {
+ //there is a minute chance the batch was evicted
+ //this call ensures that we won't leak
+ if (memoryEntries.containsKey(batch)) {
+ evictionQueue.touch(ce, false);
+ }
+ } else {
+ evictionQueue.remove(ce);
+ }
+ }
if (!retain) {
BufferManagerImpl.this.remove(ce, true);
}
@@ -655,7 +669,7 @@
if (ce != null && ce.getObject() != null) {
referenceHit.getAndIncrement();
if (retain) {
- addMemoryEntry(ce, null);
+ addMemoryEntry(ce);
} else {
BufferManagerImpl.this.remove(ce, false);
}
@@ -687,13 +701,11 @@
}
}
- void addMemoryEntry(CacheEntry ce, CacheEntry previous) {
+ void addMemoryEntry(CacheEntry ce) {
persistBatchReferences();
synchronized (ce) {
- if (previous != null) {
- ce.setOrderingValue(previous.getOrderingValue());
- }
memoryEntries.put(ce.getId(), ce);
+ evictionQueue.touch(ce, true);
}
activeBatchKB.getAndAdd(ce.getSizeEstimate());
}
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.teiid.common.buffer.BaseCacheEntry;
+import org.teiid.common.buffer.CacheKey;
+
+/**
+ * A Concurrent LRFU eviction queue. Has assumptions that match buffermanager usage.
+ * Null values are not allowed.
+ * @param <K>
+ * @param <V>
+ */
+public class LrfuEvictionQueue<V extends BaseCacheEntry> {
+
+ //TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
+ //the level function limits the effective map size to ~ 2^16
+ //above which it performs comparably under multi-threaded load to a synchronized
LinkedHashMap
+ //just with more CPU overhead vs. wait time.
+ protected NavigableMap<CacheKey, V> evictionQueue = new
ConcurrentSkipListMap<CacheKey, V>();
+ protected AtomicLong clock;
+ //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher
-> LRU
+ //TODO: adaptively adjust this value. more hits should move closer to lru
+ protected float crfLamda = .0002f;
+
+ public LrfuEvictionQueue(AtomicLong clock) {
+ this.clock = clock;
+ }
+
+ public boolean remove(V value) {
+ return evictionQueue.remove(value.getKey()) != null;
+ }
+
+ public void touch(V value, boolean initial) {
+ if (!initial) {
+ initial = evictionQueue.remove(value.getKey()) == null;
+ }
+ recordAccess(value, initial);
+ evictionQueue.put(value.getKey(), value);
+ }
+
+ public Collection<V> getEvictionQueue() {
+ return evictionQueue.values();
+ }
+
+ public V firstEntry(boolean poll) {
+ Map.Entry<CacheKey, V> entry = null;
+ if (poll) {
+ entry = evictionQueue.pollFirstEntry();
+ } else {
+ entry = evictionQueue.firstEntry();
+ }
+ if (entry != null) {
+ return entry.getValue();
+ }
+ return null;
+ }
+
+ protected void recordAccess(V value, boolean initial) {
+ assert Thread.holdsLock(value);
+ CacheKey key = value.getKey();
+ float lastAccess = key.getLastAccess();
+ float currentClock = clock.get();
+ if (initial && lastAccess == 0) {
+ return; //we just want to timestamp this as created and not give it an ordering value
+ }
+ float orderingValue = key.getOrderingValue();
+ orderingValue = computeNextOrderingValue(currentClock, lastAccess,
+ orderingValue);
+ value.setKey(new CacheKey(key.getId(), currentClock, orderingValue));
+ }
+
+ float computeNextOrderingValue(float currentTime,
+ float lastAccess, float orderingValue) {
+ orderingValue =
+ (float) (//Frequency component
+ orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
+ //recency component
+ + Math.pow(currentTime, crfLamda));
+ return orderingValue;
+ }
+
+ public float getCrfLamda() {
+ return crfLamda;
+ }
+
+ public void setCrfLamda(float crfLamda) {
+ this.crfLamda = crfLamda;
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-10-14
17:31:57 UTC (rev 3552)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -111,11 +111,12 @@
}
@Override
- public void add(CacheEntry entry, Serializer<?> s) {
+ public boolean add(CacheEntry entry, Serializer<?> s) {
Map<Long, CacheEntry> group = groups.get(s.getId());
if (group != null) {
group.put(entry.getId(), entry);
}
+ return true;
}
@Override
Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java 2011-10-14
17:31:57 UTC (rev 3552)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/OrderedCache.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -1,153 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.teiid.common.buffer.BaseCacheEntry;
-
-/**
- * A Concurrent LRFU cache. Has assumptions that match buffermanager usage.
- * Null values are not allowed.
- * @param <K>
- * @param <V>
- */
-public class OrderedCache<K, V extends BaseCacheEntry> {
-
- protected Map<K, V> map;
- //TODO: until Java 7 ConcurrentSkipListMap has a scaling bug in that
- //the level limits the effective map size to ~ 2^16
- //above which it performs comparably under load to a synchronized LinkedHashMap
- //just with more CPU overhead vs. wait time.
- protected NavigableMap<V, K> evictionQueue = new ConcurrentSkipListMap<V,
K>();
- protected Map<K, V> limbo;
- protected AtomicLong clock;
- //combined recency/frequency lamda value between 0 and 1 lower -> LFU, higher
-> LRU
- //TODO: adaptively adjust this value. more hits should move closer to lru
- protected float crfLamda = .0002f;
-
- public OrderedCache(int initialCapacity, float loadFactor, int concurrencyLevel,
AtomicLong clock) {
- map = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
- limbo = new ConcurrentHashMap<K, V>(initialCapacity, loadFactor,
concurrencyLevel);
- this.clock = clock;
- }
-
- public V get(K key) {
- V result = map.get(key);
- if (result == null) {
- result = limbo.get(key);
- }
- if (result != null) {
- synchronized (result) {
- evictionQueue.remove(result);
- recordAccess(result, false);
- evictionQueue.put(result, key);
- }
- }
- return result;
- }
-
- public V remove(K key) {
- V result = map.remove(key);
- if (result != null) {
- synchronized (result) {
- evictionQueue.remove(result);
- }
- }
- return result;
- }
-
- public V put(K key, V value) {
- V result = map.put(key, value);
- if (result != null) {
- synchronized (result) {
- evictionQueue.remove(result);
- }
- }
- synchronized (value) {
- recordAccess(value, result == null);
- evictionQueue.put(value, key);
- }
- return result;
- }
-
- public V evict() {
- Map.Entry<V, K> entry = evictionQueue.pollFirstEntry();
- if (entry == null) {
- return null;
- }
- limbo.put(entry.getValue(), entry.getKey());
- return map.remove(entry.getValue());
- }
-
- public void finishedEviction(K key) {
- limbo.remove(key);
- }
-
- public int size() {
- return map.size();
- }
-
- public Map<V, K> getEvictionQueue() {
- return evictionQueue;
- }
-
- public Map.Entry<V, K> firstEntry() {
- return evictionQueue.firstEntry();
- }
-
- protected void recordAccess(BaseCacheEntry value, boolean initial) {
- float lastAccess = value.getLastAccess();
- value.setLastAccess(clock.get());
- if (initial && lastAccess == 0) {
- return; //we just want to timestamp this as created and not give it an ordering value
- }
- float orderingValue = value.getOrderingValue();
- orderingValue = computeNextOrderingValue(value.getLastAccess(), lastAccess,
- orderingValue);
- value.setOrderingValue(orderingValue);
- }
-
- float computeNextOrderingValue(float currentTime,
- float lastAccess, float orderingValue) {
- orderingValue =
- (float) (//Frequency component
- orderingValue*Math.pow(1-crfLamda, currentTime - lastAccess)
- //recency component
- + Math.pow(currentTime, crfLamda));
- return orderingValue;
- }
-
- public float getCrfLamda() {
- return crfLamda;
- }
-
- public void setCrfLamda(float crfLamda) {
- this.crfLamda = crfLamda;
- }
-
-}
Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-10-14
17:31:57 UTC (rev 3552)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-10-17
03:12:03 UTC (rev 3553)
@@ -68,8 +68,9 @@
private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
private boolean inlineLobs = true;
- private int memoryBufferSpace = -1;
+ private long memoryBufferSpace = -1;
private int maxStorageObjectSize =
BufferFrontedFileStoreCache.DEFAuLT_MAX_OBJECT_SIZE;
+ private boolean memoryBufferOffHeap;
private FileStorageManager fsm;
/**
@@ -93,7 +94,6 @@
this.bufferMgr.setProcessorBatchSize(Integer.valueOf(processorBatchSize));
this.bufferMgr.setMaxReserveKB(this.maxReserveKb);
this.bufferMgr.setMaxProcessingKB(this.maxProcessingKb);
-
this.bufferMgr.setMemoryBufferSpace(Math.min(BufferFrontedFileStoreCache.MAX_ADDRESSABLE_MEMORY,
this.memoryBufferSpace));
this.bufferMgr.initialize();
// If necessary, add disk storage manager
@@ -110,12 +110,20 @@
ssm.setMaxFileSize(maxFileSize);
BufferFrontedFileStoreCache fsc = new BufferFrontedFileStoreCache();
fsc.setMaxStorageObjectSize(maxStorageObjectSize);
+ fsc.setDirect(memoryBufferOffHeap);
+ int batchOverheadKB =
(int)(this.memoryBufferSpace<0?(this.bufferMgr.getMaxReserveKB()<<8):this.memoryBufferSpace)>>20;
+ this.bufferMgr.setMaxReserveKB(Math.max(0, this.bufferMgr.getMaxReserveKB() -
batchOverheadKB));
if (memoryBufferSpace < 0) {
//use approximately 25% of what's set aside for the reserved
- fsc.setMemoryBufferSpace(this.bufferMgr.getMaxReserveKB() << 8);
+ fsc.setMemoryBufferSpace(((long)this.bufferMgr.getMaxReserveKB())
<< 8);
} else {
- fsc.setMemoryBufferSpace(memoryBufferSpace);
+ //scale from MB to bytes
+ fsc.setMemoryBufferSpace(memoryBufferSpace << 20);
}
+ if (!memoryBufferOffHeap && this.maxReserveKb < 0) {
+ //adjust the value
+ this.bufferMgr.setMaxReserveKB(this.bufferMgr.getMaxReserveKB() -
(int)Math.min(this.bufferMgr.getMaxReserveKB(), (fsc.getMemoryBufferSpace()>>10)));
+ }
fsc.setStorageManager(ssm);
fsc.initialize();
this.bufferMgr.setCache(fsc);
@@ -255,20 +263,29 @@
return bufferMgr.getReadAttempts();
}
- @ManagementProperty(description="Direct memory buffer space used by the buffer
manager in MB. -1 determines the setting automatically from the maxReserveKB (default
-1). This value cannot be smaller than maxStorageObjectSize.")
- public int getMemoryBufferSpace() {
+ @ManagementProperty(description="Memory buffer space used by the buffer manager
in MB. -1 determines the setting automatically from the maxReserveKB (default -1). This
value cannot be smaller than maxStorageObjectSize.")
+ public long getMemoryBufferSpace() {
return memoryBufferSpace;
}
-
+
+ @ManagementProperty(description="The maximum size of a buffer managed object
(typically a table page or a results batch) in bytes (default 8388608).")
public int getMaxStorageObjectSize() {
return maxStorageObjectSize;
}
- @ManagementProperty(description="The maximum size of a buffer managed object
(typically a table page or a results batch) in bytes (default 8388608).")
- public void setMemoryBufferSpace(int maxMemoryBufferSpace) {
- this.memoryBufferSpace = maxMemoryBufferSpace;
+ @ManagementProperty(description="Set to true to hold the memory buffer off-heap.
If true you must ensure that the VM can allocate that much direct memory (default
false).")
+ public boolean isMemoryBufferOffHeap() {
+ return memoryBufferOffHeap;
}
+
+ public void setMemoryBufferOffHeap(boolean memoryBufferOffHeap) {
+ this.memoryBufferOffHeap = memoryBufferOffHeap;
+ }
+ public void setMemoryBufferSpace(int memoryBufferSpace) {
+ this.memoryBufferSpace = memoryBufferSpace;
+ }
+
public void setMaxStorageObjectSize(int maxStorageObjectSize) {
this.maxStorageObjectSize = maxStorageObjectSize;
}