Author: clebert.suconic(a)jboss.com
Date: 2011-02-03 17:17:34 -0500 (Thu, 03 Feb 2011)
New Revision: 10176
Modified:
branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java
branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
Log:
Adding page-max-cache-size as a parameter on address settings
Modified: branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd 2011-02-03
11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd 2011-02-03
22:17:34 UTC (rev 10176)
@@ -473,6 +473,8 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="page-size-bytes" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0"
name="page-max-cache-size" type="xsd:int">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="address-full-policy" type="addressFullMessagePolicyType">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="message-counter-history-day-limit" type="xsd:int">
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -34,6 +34,8 @@
private long maxSizeBytes;
private int pageSizeBytes;
+
+ private int pageCacheMaxSize;
private int maxDeliveryAttempts;
@@ -57,6 +59,7 @@
return new
AddressSettingsInfo(object.getString("addressFullMessagePolicy"),
object.getLong("maxSizeBytes"),
object.getInt("pageSizeBytes"),
+ object.getInt("pageCacheMaxSize"),
object.getInt("maxDeliveryAttempts"),
object.getLong("redeliveryDelay"),
object.getString("DLA"),
@@ -71,6 +74,7 @@
public AddressSettingsInfo(String addressFullMessagePolicy,
long maxSizeBytes,
int pageSizeBytes,
+ int pageCacheMaxSize,
int maxDeliveryAttempts,
long redeliveryDelay,
String deadLetterAddress,
@@ -82,6 +86,7 @@
this.addressFullMessagePolicy = addressFullMessagePolicy;
this.maxSizeBytes = maxSizeBytes;
this.pageSizeBytes = pageSizeBytes;
+ this.pageCacheMaxSize = pageCacheMaxSize;
this.maxDeliveryAttempts = maxDeliveryAttempts;
this.redeliveryDelay = redeliveryDelay;
this.deadLetterAddress = deadLetterAddress;
@@ -93,6 +98,16 @@
// Public --------------------------------------------------------
+ public int getPageCacheMaxSize()
+ {
+ return pageCacheMaxSize;
+ }
+
+ public void setPageCacheMaxSize(int pageCacheMaxSize)
+ {
+ this.pageCacheMaxSize = pageCacheMaxSize;
+ }
+
public String getAddressFullMessagePolicy()
{
return addressFullMessagePolicy;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -535,6 +535,7 @@
@Parameter(desc="the delivery attempts",
name="deliveryAttempts") int deliveryAttempts,
@Parameter(desc="the max size in bytes",
name="maxSizeBytes") long maxSizeBytes,
@Parameter(desc="the page size in bytes",
name="pageSizeBytes") int pageSizeBytes,
+ @Parameter(desc="the max number of pages in the soft
memory cache", name="pageMaxCacheSize") int pageMaxCacheSize,
@Parameter(desc="the redelivery delay",
name="redeliveryDelay") long redeliveryDelay,
@Parameter(desc="the redistribution delay",
name="redistributionDelay") long redistributionDelay,
@Parameter(desc="do we send to the DLA when there is no
where to route the message", name="sendToDLAOnNoRoute") boolean
sendToDLAOnNoRoute,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -108,6 +108,8 @@
private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
+ private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME =
"page-max-cache-size";
+
private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME =
"message-counter-history-day-limit";
private static final String LVQ_NODE_NAME = "last-value-queue";
@@ -778,6 +780,10 @@
{
addressSettings.setPageSizeBytes(Long.valueOf(child.getTextContent()));
}
+ else if
(FileConfigurationParser.PAGE_MAX_CACHE_SIZE_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
+ {
+
addressSettings.setPageCacheMaxSize(Integer.valueOf(child.getTextContent()));
+ }
else if
(FileConfigurationParser.MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
addressSettings.setMessageCounterHistoryDayLimit(Integer.valueOf(child.getTextContent()));
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -1489,6 +1489,7 @@
settings.put("expiryAddress", addressSettings.getExpiryAddress());
}
settings.put("maxDeliveryAttempts",
addressSettings.getMaxDeliveryAttempts());
+ settings.put("pageCacheMaxSize", addressSettings.getPageCacheMaxSize());
settings.put("maxSizeBytes", addressSettings.getMaxSizeBytes());
settings.put("pageSizeBytes", addressSettings.getPageSizeBytes());
settings.put("redeliveryDelay", addressSettings.getRedeliveryDelay());
@@ -1504,13 +1505,15 @@
return jsonObject.toString();
}
- public void addAddressSettings(final String address,
+
+ public void addAddressSettings(final String address,
final String DLA,
final String expiryAddress,
final boolean lastValueQueue,
final int deliveryAttempts,
final long maxSizeBytes,
final int pageSizeBytes,
+ final int pageMaxCacheSize,
final long redeliveryDelay,
final long redistributionDelay,
final boolean sendToDLAOnNoRoute,
@@ -1523,6 +1526,7 @@
addressSettings.setExpiryAddress(expiryAddress == null ? null : new
SimpleString(expiryAddress));
addressSettings.setLastValueQueue(lastValueQueue);
addressSettings.setMaxDeliveryAttempts(deliveryAttempts);
+ addressSettings.setPageCacheMaxSize(pageMaxCacheSize);
addressSettings.setMaxSizeBytes(maxSizeBytes);
addressSettings.setPageSizeBytes(pageSizeBytes);
addressSettings.setRedeliveryDelay(redeliveryDelay);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -15,6 +15,7 @@
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.utils.SoftValueHashMap;
/**
* A PageCache
@@ -23,7 +24,7 @@
*
*
*/
-public interface PageCache
+public interface PageCache extends SoftValueHashMap.ValueCache
{
Page getPage();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -66,6 +66,10 @@
// Perform the cleanup at the caller's thread (for startup and recovery)
void cleanup();
+
+ int getCacheMaxSize();
+
+ void setCacheMaxSize(int size);
/**
* @param pageCursorImpl
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -35,15 +35,6 @@
int getMessageNr();
- void setPageCache(PageCache pageCache);
-
- /**
- * PagePosition will hold the page with a weak reference.
- * So, this could be eventually null case soft-cache was released
- * @return
- */
- PageCache getPageCache();
-
PagePosition nextMessage();
PagePosition nextPage();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -62,9 +62,9 @@
private final Executor executor;
- private Map<Long, PageCache> softCache = new SoftValueHashMap<Long,
PageCache>();
+ private final SoftValueHashMap<Long, PageCache> softCache;
- private ConcurrentMap<Long, PageSubscription> activeCursors = new
ConcurrentHashMap<Long, PageSubscription>();
+ private final ConcurrentMap<Long, PageSubscription> activeCursors = new
ConcurrentHashMap<Long, PageSubscription>();
// Static --------------------------------------------------------
@@ -72,12 +72,14 @@
public PageCursorProviderImpl(final PagingStore pagingStore,
final StorageManager storageManager,
- final ExecutorFactory executorFactory)
+ final ExecutorFactory executorFactory,
+ final int maxCacheSize)
{
this.pagingStore = pagingStore;
this.storageManager = storageManager;
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
+ this.softCache = new SoftValueHashMap<Long, PageCache>(maxCacheSize);
}
// Public --------------------------------------------------------
@@ -96,12 +98,12 @@
}
activeCursor = new PageSubscriptionImpl(this,
- pagingStore,
- storageManager,
- executorFactory.getExecutor(),
- filter,
- cursorID,
- persistent);
+ pagingStore,
+ storageManager,
+ executorFactory.getExecutor(),
+ filter,
+ cursorID,
+ persistent);
activeCursors.put(cursorID, activeCursor);
return activeCursor;
}
@@ -126,8 +128,10 @@
return cache.getMessage(pos.getMessageNr());
}
-
- public PagedReference newReference(final PagePosition pos, final PagedMessage msg,
final PageSubscription subscription)
+
+ public PagedReference newReference(final PagePosition pos,
+ final PagedMessage msg,
+ final PageSubscription subscription)
{
return new PagedReferenceImpl(pos, msg, subscription);
}
@@ -137,13 +141,7 @@
*/
public PageCache getPageCache(PagePosition pos)
{
- PageCache cache = pos.getPageCache();
- if (cache == null)
- {
- cache = getPageCache(pos.getPageNr());
- pos.setPageCache(cache);
- }
- return cache;
+ return getPageCache(pos.getPageNr());
}
public void addPageCache(PageCache cache)
@@ -154,6 +152,16 @@
}
}
+ public int getCacheMaxSize()
+ {
+ return softCache.getMaxEelements();
+ }
+
+ public void setCacheMaxSize(final int size)
+ {
+ softCache.setMaxElements(size);
+ }
+
public int getCacheSize()
{
synchronized (softCache)
@@ -245,7 +253,7 @@
{
return;
}
-
+
if (pagingStore.getNumberOfPages() == 0)
{
return;
@@ -255,7 +263,7 @@
cursorList.addAll(activeCursors.values());
long minPage = checkMinPage(cursorList);
-
+
if (minPage == pagingStore.getCurrentWritingPage() &&
pagingStore.getCurrentPage().getNumberOfMessages() > 0)
{
boolean complete = true;
@@ -272,7 +280,8 @@
if (complete)
{
- log.info("Address " + pagingStore.getAddress() + " is
leaving page mode as all messages are consumed and acknowledged from the page
store");
+ log.info("Address " + pagingStore.getAddress() +
+ " is leaving page mode as all messages are consumed and
acknowledged from the page store");
pagingStore.forceAnotherPage();
Page currentPage = pagingStore.getCurrentPage();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -33,9 +33,7 @@
/** ID used for storage */
private long recordID;
-
- private volatile WeakReference<PageCache> cacheReference;
-
+
/**
* @param pageNr
* @param messageNr
@@ -47,12 +45,6 @@
this.messageNr = messageNr;
}
- public PagePositionImpl(long pageNr, int messageNr, PageCache pageCache)
- {
- this(pageNr, messageNr);
- this.setPageCache(pageCache);
- }
-
/**
* @param pageNr
* @param messageNr
@@ -63,31 +55,6 @@
}
/**
- * The cached page associaed with this position
- * @return
- */
- public PageCache getPageCache()
- {
- if (cacheReference == null)
- {
- return null;
- }
- else
- {
- return cacheReference.get();
- }
- }
-
- public void setPageCache(final PageCache cache)
- {
- if (cache != null)
- {
- this.cacheReference = new WeakReference<PageCache>(cache);
- }
- }
-
-
- /**
* @return the recordID
*/
public long getRecordID()
@@ -153,7 +120,7 @@
public PagePosition nextMessage()
{
- return new PagePositionImpl(this.pageNr, this.messageNr + 1, this.getPageCache());
+ return new PagePositionImpl(this.pageNr, this.messageNr + 1);
}
public PagePosition nextPage()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -199,7 +199,7 @@
this.syncTimer = null;
}
- this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager,
executorFactory);
+ this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager,
executorFactory, addressSettings.getPageCacheMaxSize());
}
@@ -213,6 +213,11 @@
pageSize = addressSettings.getPageSizeBytes();
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
+
+ if (cursorProvider != null)
+ {
+ cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
+ }
}
// Public --------------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -44,6 +44,8 @@
public static final long DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
+
+ public static final int DEFAULT_PAGE_MAX_CACHE = 5;
public static final int DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
@@ -60,6 +62,8 @@
private Long maxSizeBytes = null;
private Long pageSizeBytes = null;
+
+ private Integer pageMaxCache = null;
private Boolean dropMessagesWhenFull = null;
@@ -109,6 +113,16 @@
{
pageSizeBytes = pageSize;
}
+
+ public int getPageCacheMaxSize()
+ {
+ return pageMaxCache != null ? pageMaxCache :
AddressSettings.DEFAULT_PAGE_MAX_CACHE;
+ }
+
+ public void setPageCacheMaxSize(final int pageMaxCache)
+ {
+ this.pageMaxCache = pageMaxCache;
+ }
public long getMaxSizeBytes()
{
@@ -209,6 +223,10 @@
{
maxSizeBytes = merged.maxSizeBytes;
}
+ if (pageMaxCache == null)
+ {
+ pageMaxCache = merged.pageMaxCache;
+ }
if (pageSizeBytes == null)
{
pageSizeBytes = merged.getPageSizeBytes();
@@ -262,6 +280,8 @@
maxSizeBytes = BufferHelper.readNullableLong(buffer);
pageSizeBytes = BufferHelper.readNullableLong(buffer);
+
+ pageMaxCache = BufferHelper.readNullableInteger(buffer);
dropMessagesWhenFull = BufferHelper.readNullableBoolean(buffer);
@@ -291,6 +311,7 @@
return BufferHelper.sizeOfNullableSimpleString(addressFullMessagePolicy != null ?
addressFullMessagePolicy.toString()
:
null) + BufferHelper.sizeOfNullableLong(maxSizeBytes) +
BufferHelper.sizeOfNullableLong(pageSizeBytes) +
+ BufferHelper.sizeOfNullableInteger(pageMaxCache) +
BufferHelper.sizeOfNullableBoolean(dropMessagesWhenFull) +
BufferHelper.sizeOfNullableInteger(maxDeliveryAttempts) +
BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
@@ -314,6 +335,8 @@
BufferHelper.writeNullableLong(buffer, pageSizeBytes);
+ BufferHelper.writeNullableInteger(buffer, pageMaxCache);
+
BufferHelper.writeNullableBoolean(buffer, dropMessagesWhenFull);
BufferHelper.writeNullableInteger(buffer, maxDeliveryAttempts);
@@ -351,6 +374,7 @@
result = prime * result +
((messageCounterHistoryDayLimit == null) ? 0 :
messageCounterHistoryDayLimit.hashCode());
result = prime * result + ((pageSizeBytes == null) ? 0 :
pageSizeBytes.hashCode());
+ result = prime * result + ((pageMaxCache == null) ? 0 : pageMaxCache.hashCode());
result = prime * result + ((redeliveryDelay == null) ? 0 :
redeliveryDelay.hashCode());
result = prime * result + ((redistributionDelay == null) ? 0 :
redistributionDelay.hashCode());
result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 :
sendToDLAOnNoRoute.hashCode());
@@ -433,6 +457,13 @@
}
else if (!pageSizeBytes.equals(other.pageSizeBytes))
return false;
+ if (pageMaxCache == null)
+ {
+ if (other.pageMaxCache != null)
+ return false;
+ }
+ else if (!pageMaxCache.equals(other.pageMaxCache))
+ return false;
if (redeliveryDelay == null)
{
if (other.redeliveryDelay != null)
@@ -480,6 +511,8 @@
messageCounterHistoryDayLimit +
", pageSizeBytes=" +
pageSizeBytes +
+ ", pageMaxCache=" +
+ pageMaxCache +
", redeliveryDelay=" +
redeliveryDelay +
", redistributionDelay=" +
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java 2011-02-03
11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -17,19 +17,22 @@
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * A SoftValueConcurrentHashMap
+ * A SoftValueHashMap
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
*
*
*/
-public class SoftValueHashMap<K, V> implements Map<K, V>
+public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implements
Map<K, V>
{
// The soft references that are already good.
// too bad there's no way to override the queue method on ReferenceQueue, so I
wouldn't need this
@@ -37,16 +40,42 @@
private final Map<K, AggregatedSoftReference> mapDelegate = new HashMap<K,
AggregatedSoftReference>();
+ private final AtomicLong nextId = new AtomicLong(0);
+
+ private int maxElements;
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
+ public static abstract interface ValueCache
+ {
+ public abstract boolean isLive();
+ }
+
// Constructors --------------------------------------------------
+ public SoftValueHashMap(final int maxElements)
+ {
+ this.maxElements = maxElements;
+ }
+
// Public --------------------------------------------------------
+
+ public void setMaxElements(final int maxElements)
+ {
+ this.maxElements = maxElements;
+ checkCacheSize();
+ }
+
+ public int getMaxEelements()
+ {
+ return this.maxElements;
+ }
+
/**
* @return
* @see java.util.Map#size()
@@ -109,6 +138,7 @@
AggregatedSoftReference value = mapDelegate.get(key);
if (value != null)
{
+ value.used();
return value.get();
}
else
@@ -127,6 +157,7 @@
{
processQueue();
AggregatedSoftReference refPut = mapDelegate.put(key, createReference(key,
value));
+ checkCacheSize();
if (refPut != null)
{
return refPut.get();
@@ -137,6 +168,66 @@
}
}
+ private void checkCacheSize()
+ {
+ if (maxElements > 0 && mapDelegate.size() > maxElements)
+ {
+ TreeSet<AggregatedSoftReference> usedReferences = new
TreeSet<AggregatedSoftReference>(new ComparatorAgregated());
+
+ for (AggregatedSoftReference ref : mapDelegate.values())
+ {
+ V v = ref.get();
+
+ if (v != null && !v.isLive())
+ {
+ usedReferences.add(ref);
+ }
+ }
+
+ for (AggregatedSoftReference ref : usedReferences)
+ {
+ mapDelegate.remove(ref.key);
+
+ if (mapDelegate.size() <= maxElements)
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ class ComparatorAgregated implements Comparator<AggregatedSoftReference>
+ {
+ public int compare(AggregatedSoftReference o1, AggregatedSoftReference o2)
+ {
+ long k = o1.used - o2.used;
+
+ if (k > 0)
+ {
+ return 1;
+ }
+ else if (k < 0)
+ {
+ return -1;
+ }
+
+ k = o1.id - o2.id;
+
+ if (k > 0)
+ {
+ return 1;
+ }
+ else if (k < 0)
+ {
+ return -1;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ }
+
/**
* @param key
* @return
@@ -222,7 +313,7 @@
V value = pair.getValue().get();
if (value != null)
{
- set.add(new EntryElement<K,V>(pair.getKey(), value));
+ set.add(new EntryElement<K, V>(pair.getKey(), value));
}
}
return set;
@@ -262,6 +353,7 @@
AggregatedSoftReference ref = null;
while ((ref = (AggregatedSoftReference)this.refQueue.poll()) != null)
{
+ System.out.println("Removing " + ref.key);
mapDelegate.remove(ref.key);
}
}
@@ -278,6 +370,20 @@
{
final K key;
+ long id = nextId.incrementAndGet();
+
+ long used = 0;
+
+ public long getUsed()
+ {
+ return used;
+ }
+
+ public void used()
+ {
+ used++;
+ }
+
public AggregatedSoftReference(final K key, final V referent)
{
super(referent, refQueue);
Modified: branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml 2011-02-03
11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml 2011-02-03
22:17:34 UTC (rev 10176)
@@ -202,6 +202,7 @@
<redelivery-delay>1</redelivery-delay>
<max-size-bytes>81781728121878</max-size-bytes>
<page-size-bytes>81738173872337</page-size-bytes>
+ <page-max-cache-size>10</page-max-cache-size>
<message-counter-history-day-limit>4</message-counter-history-day-limit>
</address-setting>
<address-setting match="a2">
@@ -210,6 +211,7 @@
<redelivery-delay>5</redelivery-delay>
<max-size-bytes>932489234928324</max-size-bytes>
<page-size-bytes>7126716262626</page-size-bytes>
+ <page-max-cache-size>20</page-max-cache-size>
<message-counter-history-day-limit>8</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -879,6 +879,7 @@
1024 * 1024,
1024 * 10,
5,
+ 5,
0,
false,
"PAGE");
@@ -945,6 +946,7 @@
100 * 1024,
10 * 1024,
5,
+ 5,
0,
false,
"PAGE");
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -480,6 +480,7 @@
int deliveryAttempts = 1;
long maxSizeBytes = 2;
int pageSizeBytes = 3;
+ int pageMaxCacheSize = 7;
long redeliveryDelay = 4;
long redistributionDelay = 5;
boolean sendToDLAOnNoRoute = true;
@@ -492,6 +493,7 @@
deliveryAttempts,
maxSizeBytes,
pageSizeBytes,
+ pageMaxCacheSize,
redeliveryDelay,
redistributionDelay,
sendToDLAOnNoRoute,
@@ -508,6 +510,7 @@
assertEquals(lastValueQueue, info.isLastValueQueue());
assertEquals(deliveryAttempts, info.getMaxDeliveryAttempts());
assertEquals(maxSizeBytes, info.getMaxSizeBytes());
+ assertEquals(pageMaxCacheSize, info.getPageCacheMaxSize());
assertEquals(pageSizeBytes, info.getPageSizeBytes());
assertEquals(redeliveryDelay, info.getRedeliveryDelay());
assertEquals(redistributionDelay, info.getRedistributionDelay());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -512,6 +512,7 @@
@Parameter(desc = "the delivery
attempts", name = "deliveryAttempts") int deliveryAttempts,
@Parameter(desc = "the max size in
bytes", name = "maxSizeBytes") long maxSizeBytes,
@Parameter(desc = "the page size in
bytes", name = "pageSizeBytes") int pageSizeBytes,
+ int pageMaxCacheSize,
@Parameter(desc = "the redelivery
delay", name = "redeliveryDelay") long redeliveryDelay,
@Parameter(desc = "the redistribution
delay", name = "redistributionDelay") long redistributionDelay,
@Parameter(desc = "do we send to the DLA
when there is no where to route the message", name = "sendToDLAOnNoRoute")
boolean sendToDLAOnNoRoute,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -90,7 +90,7 @@
public void testReadCache() throws Exception
{
- final int NUM_MESSAGES = 1000;
+ final int NUM_MESSAGES = 100;
int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
@@ -98,7 +98,8 @@
PageCursorProviderImpl cursorProvider = new
PageCursorProviderImpl(lookupPageStore(ADDRESS),
server.getStorageManager(),
-
server.getExecutorFactory());
+
server.getExecutorFactory(),
+ 5);
for (int i = 0; i < numberOfPages; i++)
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -260,6 +260,7 @@
assertEquals(1,
conf.getAddressesSettings().get("a1").getRedeliveryDelay());
assertEquals(81781728121878l,
conf.getAddressesSettings().get("a1").getMaxSizeBytes());
assertEquals(81738173872337l,
conf.getAddressesSettings().get("a1").getPageSizeBytes());
+ assertEquals(10,
conf.getAddressesSettings().get("a1").getPageCacheMaxSize());
assertEquals(4,
conf.getAddressesSettings().get("a1").getMessageCounterHistoryDayLimit());
assertEquals("a2.1",
conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
@@ -267,6 +268,7 @@
assertEquals(5,
conf.getAddressesSettings().get("a2").getRedeliveryDelay());
assertEquals(932489234928324l,
conf.getAddressesSettings().get("a2").getMaxSizeBytes());
assertEquals(7126716262626l,
conf.getAddressesSettings().get("a2").getPageSizeBytes());
+ assertEquals(20,
conf.getAddressesSettings().get("a2").getPageCacheMaxSize());
assertEquals(8,
conf.getAddressesSettings().get("a2").getMessageCounterHistoryDayLimit());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java 2011-02-03
11:35:45 UTC (rev 10175)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java 2011-02-03
22:17:34 UTC (rev 10176)
@@ -40,27 +40,111 @@
{
forceGC();
long maxMemory = Runtime.getRuntime().maxMemory() -
Runtime.getRuntime().freeMemory();
-
+
// each buffer will be 1/10th of the maxMemory
int bufferSize = (int)(maxMemory / 100);
+
+ class Value implements SoftValueHashMap.ValueCache
+ {
+ byte[] payload;
+
+ Value(byte[] payload)
+ {
+ this.payload = payload;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.SoftValueHashMap.ValueCache#isLive()
+ */
+ public boolean isLive()
+ {
+ return false;
+ }
+ }
+
+ SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<Long,
Value>(100);
+
+ final int MAX_ELEMENTS = 1000;
+
+ for (long i = 0; i < MAX_ELEMENTS; i++)
+ {
+ softCache.put(i, new Value(new byte[bufferSize]));
+ }
+
+ assertTrue(softCache.size() < MAX_ELEMENTS);
- SoftValueHashMap<Long, byte[]> softCache = new SoftValueHashMap<Long,
byte[]>();
+ System.out.println("SoftCache.size " + softCache.size());
+
+ System.out.println("Soft cache has " + softCache.size() + "
elements");
+ }
+
+
+ public void testEvictionsLeastUsed()
+ {
+ forceGC();
+
+ class Value implements SoftValueHashMap.ValueCache
+ {
+ byte[] payload;
+
+ boolean live;
+
+ Value(byte[] payload)
+ {
+ this.payload = payload;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.SoftValueHashMap.ValueCache#isLive()
+ */
+ public boolean isLive()
+ {
+ return live;
+ }
+
+ public void setLive(boolean live)
+ {
+ this.live = live;
+ }
+ }
+
+ SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<Long,
Value>(200);
- final int MAX_ELEMENTS = 1000;
+ for (long i = 0 ; i < 100; i++)
+ {
+ Value v = new Value(new byte[1]);
+ v.setLive(true);
+ softCache.put(i, v);
+ }
- for (long i = 0 ; i < MAX_ELEMENTS; i++)
+ for (long i = 100; i < 200; i++)
{
- softCache.put(i, new byte[bufferSize]);
+ Value v = new Value(new byte[1]);
+ softCache.put(i, v);
}
+ assertNotNull(softCache.get(100l));
- assertTrue(softCache.size() < MAX_ELEMENTS);
+ softCache.put(300l, new Value(new byte[1]));
+ // these are live, so they shouldn't go
+
+ for (long i = 0; i < 100; i++)
+ {
+ assertNotNull(softCache.get(i));
+ }
+
+ // this was accessed, so it shouldn't go
+ assertNotNull(softCache.get(100l));
+
+ // this is the next one, so it should go
+ assertNull(softCache.get(101l));
+
+ System.out.println("SoftCache.size " + softCache.size());
+
System.out.println("Soft cache has " + softCache.size() + "
elements");
}
-
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------