[hornetq-commits] JBoss hornetq SVN: r10176 - in branches/Branch_2_2_EAP: src/main/org/hornetq/api/core/management and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Feb 3 17:17:35 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-03 17:17:34 -0500 (Thu, 03 Feb 2011)
New Revision: 10176

Modified:
   branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
   branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java
   branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
Log:
Adding page-max-cache-size as a parameter on address settings

Modified: branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/config/common/schema/hornetq-configuration.xsd	2011-02-03 22:17:34 UTC (rev 10176)
@@ -473,6 +473,8 @@
         </xsd:element>
         <xsd:element maxOccurs="1" minOccurs="0" name="page-size-bytes" type="xsd:long">
         </xsd:element>
+        <xsd:element maxOccurs="1" minOccurs="0" name="page-max-cache-size" type="xsd:int">
+        </xsd:element>
         <xsd:element maxOccurs="1" minOccurs="0" name="address-full-policy" type="addressFullMessagePolicyType">
 		</xsd:element>                
         <xsd:element maxOccurs="1" minOccurs="0" name="message-counter-history-day-limit" type="xsd:int">

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/AddressSettingsInfo.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -34,6 +34,8 @@
    private long maxSizeBytes;
 
    private int pageSizeBytes;
+   
+   private int pageCacheMaxSize;
 
    private int maxDeliveryAttempts;
 
@@ -57,6 +59,7 @@
       return new AddressSettingsInfo(object.getString("addressFullMessagePolicy"),
                                      object.getLong("maxSizeBytes"),
                                      object.getInt("pageSizeBytes"),
+                                     object.getInt("pageCacheMaxSize"),
                                      object.getInt("maxDeliveryAttempts"),
                                      object.getLong("redeliveryDelay"),
                                      object.getString("DLA"),
@@ -71,6 +74,7 @@
    public AddressSettingsInfo(String addressFullMessagePolicy,
                               long maxSizeBytes,
                               int pageSizeBytes,
+                              int pageCacheMaxSize,
                               int maxDeliveryAttempts,
                               long redeliveryDelay,
                               String deadLetterAddress,
@@ -82,6 +86,7 @@
       this.addressFullMessagePolicy = addressFullMessagePolicy;
       this.maxSizeBytes = maxSizeBytes;
       this.pageSizeBytes = pageSizeBytes;
+      this.pageCacheMaxSize = pageCacheMaxSize;
       this.maxDeliveryAttempts = maxDeliveryAttempts;
       this.redeliveryDelay = redeliveryDelay;
       this.deadLetterAddress = deadLetterAddress;
@@ -93,6 +98,16 @@
 
    // Public --------------------------------------------------------
 
+   public int getPageCacheMaxSize()
+   {
+      return pageCacheMaxSize;
+   }
+
+   public void setPageCacheMaxSize(int pageCacheMaxSize)
+   {
+      this.pageCacheMaxSize = pageCacheMaxSize;
+   }
+
    public String getAddressFullMessagePolicy()
    {
       return addressFullMessagePolicy;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/HornetQServerControl.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -535,6 +535,7 @@
                            @Parameter(desc="the delivery attempts", name="deliveryAttempts") int deliveryAttempts,
                            @Parameter(desc="the max size in bytes", name="maxSizeBytes") long maxSizeBytes,
                            @Parameter(desc="the page size in bytes", name="pageSizeBytes") int pageSizeBytes,
+                           @Parameter(desc="the max number of pages in the soft memory cache", name="pageMaxCacheSize") int pageMaxCacheSize,
                            @Parameter(desc="the redelivery delay", name="redeliveryDelay") long redeliveryDelay,
                            @Parameter(desc="the redistribution delay", name="redistributionDelay") long redistributionDelay,
                            @Parameter(desc="do we send to the DLA when there is no where to route the message", name="sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -108,6 +108,8 @@
 
    private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
 
+   private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size";
+
    private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit";
 
    private static final String LVQ_NODE_NAME = "last-value-queue";
@@ -778,6 +780,10 @@
          {
             addressSettings.setPageSizeBytes(Long.valueOf(child.getTextContent()));
          }
+         else if (FileConfigurationParser.PAGE_MAX_CACHE_SIZE_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
+         {
+            addressSettings.setPageCacheMaxSize(Integer.valueOf(child.getTextContent()));
+         }
          else if (FileConfigurationParser.MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
          {
             addressSettings.setMessageCounterHistoryDayLimit(Integer.valueOf(child.getTextContent()));

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -1489,6 +1489,7 @@
          settings.put("expiryAddress", addressSettings.getExpiryAddress());
       }
       settings.put("maxDeliveryAttempts", addressSettings.getMaxDeliveryAttempts());
+      settings.put("pageCacheMaxSize", addressSettings.getPageCacheMaxSize());
       settings.put("maxSizeBytes", addressSettings.getMaxSizeBytes());
       settings.put("pageSizeBytes", addressSettings.getPageSizeBytes());
       settings.put("redeliveryDelay", addressSettings.getRedeliveryDelay());
@@ -1504,13 +1505,15 @@
       return jsonObject.toString();
    }
 
-   public void addAddressSettings(final String address,
+   
+    public void addAddressSettings(final String address,
                                   final String DLA,
                                   final String expiryAddress,
                                   final boolean lastValueQueue,
                                   final int deliveryAttempts,
                                   final long maxSizeBytes,
                                   final int pageSizeBytes,
+                                  final int pageMaxCacheSize,
                                   final long redeliveryDelay,
                                   final long redistributionDelay,
                                   final boolean sendToDLAOnNoRoute,
@@ -1523,6 +1526,7 @@
       addressSettings.setExpiryAddress(expiryAddress == null ? null : new SimpleString(expiryAddress));
       addressSettings.setLastValueQueue(lastValueQueue);
       addressSettings.setMaxDeliveryAttempts(deliveryAttempts);
+      addressSettings.setPageCacheMaxSize(pageMaxCacheSize);
       addressSettings.setMaxSizeBytes(maxSizeBytes);
       addressSettings.setPageSizeBytes(pageSizeBytes);
       addressSettings.setRedeliveryDelay(redeliveryDelay);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCache.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -15,6 +15,7 @@
 
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.utils.SoftValueHashMap;
 
 /**
  * A PageCache
@@ -23,7 +24,7 @@
  *
  *
  */
-public interface PageCache
+public interface PageCache extends SoftValueHashMap.ValueCache
 {
    Page getPage();
    

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -66,6 +66,10 @@
    
    // Perform the cleanup at the caller's thread (for startup and recovery)
    void cleanup();
+   
+   int getCacheMaxSize();
+   
+   void setCacheMaxSize(int size);
 
    /**
     * @param pageCursorImpl

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -35,15 +35,6 @@
 
    int getMessageNr();
    
-   void setPageCache(PageCache pageCache);
-   
-   /**
-    * PagePosition will hold the page with a weak reference.
-    * So, this could be eventually null case soft-cache was released
-    * @return
-    */
-   PageCache getPageCache();
-
    PagePosition nextMessage();
 
    PagePosition nextPage();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -62,9 +62,9 @@
 
    private final Executor executor;
 
-   private Map<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
+   private final SoftValueHashMap<Long, PageCache> softCache;
 
-   private ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>();
+   private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>();
 
    // Static --------------------------------------------------------
 
@@ -72,12 +72,14 @@
 
    public PageCursorProviderImpl(final PagingStore pagingStore,
                                  final StorageManager storageManager,
-                                 final ExecutorFactory executorFactory)
+                                 final ExecutorFactory executorFactory,
+                                 final int maxCacheSize)
    {
       this.pagingStore = pagingStore;
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
       this.executor = executorFactory.getExecutor();
+      this.softCache = new SoftValueHashMap<Long, PageCache>(maxCacheSize);
    }
 
    // Public --------------------------------------------------------
@@ -96,12 +98,12 @@
       }
 
       activeCursor = new PageSubscriptionImpl(this,
-                                        pagingStore,
-                                        storageManager,
-                                        executorFactory.getExecutor(),
-                                        filter,
-                                        cursorID,
-                                        persistent);
+                                              pagingStore,
+                                              storageManager,
+                                              executorFactory.getExecutor(),
+                                              filter,
+                                              cursorID,
+                                              persistent);
       activeCursors.put(cursorID, activeCursor);
       return activeCursor;
    }
@@ -126,8 +128,10 @@
 
       return cache.getMessage(pos.getMessageNr());
    }
-   
-   public PagedReference newReference(final PagePosition pos, final PagedMessage msg, final PageSubscription subscription)
+
+   public PagedReference newReference(final PagePosition pos,
+                                      final PagedMessage msg,
+                                      final PageSubscription subscription)
    {
       return new PagedReferenceImpl(pos, msg, subscription);
    }
@@ -137,13 +141,7 @@
     */
    public PageCache getPageCache(PagePosition pos)
    {
-      PageCache cache = pos.getPageCache();
-      if (cache == null)
-      {
-         cache = getPageCache(pos.getPageNr());
-         pos.setPageCache(cache);
-      }
-      return cache;
+      return getPageCache(pos.getPageNr());
    }
 
    public void addPageCache(PageCache cache)
@@ -154,6 +152,16 @@
       }
    }
 
+   public int getCacheMaxSize()
+   {
+      return softCache.getMaxEelements();
+   }
+
+   public void setCacheMaxSize(final int size)
+   {
+      softCache.setMaxElements(size);
+   }
+
    public int getCacheSize()
    {
       synchronized (softCache)
@@ -245,7 +253,7 @@
             {
                return;
             }
-            
+
             if (pagingStore.getNumberOfPages() == 0)
             {
                return;
@@ -255,7 +263,7 @@
             cursorList.addAll(activeCursors.values());
 
             long minPage = checkMinPage(cursorList);
-            
+
             if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
             {
                boolean complete = true;
@@ -272,7 +280,8 @@
                if (complete)
                {
 
-                  log.info("Address " + pagingStore.getAddress() + " is leaving page mode as all messages are consumed and acknowledged from the page store");
+                  log.info("Address " + pagingStore.getAddress() +
+                           " is leaving page mode as all messages are consumed and acknowledged from the page store");
                   pagingStore.forceAnotherPage();
 
                   Page currentPage = pagingStore.getCurrentPage();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -33,9 +33,7 @@
 
    /** ID used for storage */
    private long recordID;
-   
-   private volatile WeakReference<PageCache> cacheReference;
-
+ 
    /**
     * @param pageNr
     * @param messageNr
@@ -47,12 +45,6 @@
       this.messageNr = messageNr;
    }
 
-   public PagePositionImpl(long pageNr, int messageNr, PageCache pageCache)
-   {
-      this(pageNr, messageNr);
-      this.setPageCache(pageCache);
-   }
-
    /**
     * @param pageNr
     * @param messageNr
@@ -63,31 +55,6 @@
    }
 
    /**
-    * The cached page associaed with this position
-    * @return
-    */
-   public PageCache getPageCache()
-   {
-      if (cacheReference == null)
-      {
-         return null;
-      }
-      else
-      {
-         return cacheReference.get();
-      }
-   }
-   
-   public void setPageCache(final PageCache cache)
-   {
-      if (cache != null)
-      {
-         this.cacheReference = new WeakReference<PageCache>(cache);
-      }
-   }
-
-
-   /**
     * @return the recordID
     */
    public long getRecordID()
@@ -153,7 +120,7 @@
 
    public PagePosition nextMessage()
    {
-      return new PagePositionImpl(this.pageNr, this.messageNr + 1, this.getPageCache());
+      return new PagePositionImpl(this.pageNr, this.messageNr + 1);
    }
 
    public PagePosition nextPage()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -199,7 +199,7 @@
          this.syncTimer = null;
       }
 
-      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
+      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory, addressSettings.getPageCacheMaxSize());
 
    }
 
@@ -213,6 +213,11 @@
       pageSize = addressSettings.getPageSizeBytes();
 
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
+      
+      if (cursorProvider != null)
+      {
+         cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
+      }
    }
 
    // Public --------------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/AddressSettings.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -44,6 +44,8 @@
    public static final long DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
 
    public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
+   
+   public static final int DEFAULT_PAGE_MAX_CACHE = 5;
 
    public static final int DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
 
@@ -60,6 +62,8 @@
    private Long maxSizeBytes = null;
 
    private Long pageSizeBytes = null;
+   
+   private Integer pageMaxCache = null;
 
    private Boolean dropMessagesWhenFull = null;
 
@@ -109,6 +113,16 @@
    {
       pageSizeBytes = pageSize;
    }
+   
+   public int getPageCacheMaxSize()
+   {
+      return pageMaxCache != null ? pageMaxCache : AddressSettings.DEFAULT_PAGE_MAX_CACHE;
+   }
+   
+   public void setPageCacheMaxSize(final int pageMaxCache)
+   {
+      this.pageMaxCache = pageMaxCache;
+   }
 
    public long getMaxSizeBytes()
    {
@@ -209,6 +223,10 @@
       {
          maxSizeBytes = merged.maxSizeBytes;
       }
+      if (pageMaxCache == null)
+      {
+         pageMaxCache = merged.pageMaxCache;
+      }
       if (pageSizeBytes == null)
       {
          pageSizeBytes = merged.getPageSizeBytes();
@@ -262,6 +280,8 @@
       maxSizeBytes = BufferHelper.readNullableLong(buffer);
 
       pageSizeBytes = BufferHelper.readNullableLong(buffer);
+      
+      pageMaxCache = BufferHelper.readNullableInteger(buffer);
 
       dropMessagesWhenFull = BufferHelper.readNullableBoolean(buffer);
 
@@ -291,6 +311,7 @@
       return BufferHelper.sizeOfNullableSimpleString(addressFullMessagePolicy != null ? addressFullMessagePolicy.toString()
                                                                                      : null) + BufferHelper.sizeOfNullableLong(maxSizeBytes) +
              BufferHelper.sizeOfNullableLong(pageSizeBytes) +
+             BufferHelper.sizeOfNullableInteger(pageMaxCache) +
              BufferHelper.sizeOfNullableBoolean(dropMessagesWhenFull) +
              BufferHelper.sizeOfNullableInteger(maxDeliveryAttempts) +
              BufferHelper.sizeOfNullableInteger(messageCounterHistoryDayLimit) +
@@ -314,6 +335,8 @@
 
       BufferHelper.writeNullableLong(buffer, pageSizeBytes);
 
+      BufferHelper.writeNullableInteger(buffer, pageMaxCache);
+
       BufferHelper.writeNullableBoolean(buffer, dropMessagesWhenFull);
 
       BufferHelper.writeNullableInteger(buffer, maxDeliveryAttempts);
@@ -351,6 +374,7 @@
       result = prime * result +
                ((messageCounterHistoryDayLimit == null) ? 0 : messageCounterHistoryDayLimit.hashCode());
       result = prime * result + ((pageSizeBytes == null) ? 0 : pageSizeBytes.hashCode());
+      result = prime * result + ((pageMaxCache == null) ? 0 : pageMaxCache.hashCode());
       result = prime * result + ((redeliveryDelay == null) ? 0 : redeliveryDelay.hashCode());
       result = prime * result + ((redistributionDelay == null) ? 0 : redistributionDelay.hashCode());
       result = prime * result + ((sendToDLAOnNoRoute == null) ? 0 : sendToDLAOnNoRoute.hashCode());
@@ -433,6 +457,13 @@
       }
       else if (!pageSizeBytes.equals(other.pageSizeBytes))
          return false;
+      if (pageMaxCache == null)
+      {
+         if (other.pageMaxCache != null)
+            return false;
+      }
+      else if (!pageMaxCache.equals(other.pageMaxCache))
+         return false;
       if (redeliveryDelay == null)
       {
          if (other.redeliveryDelay != null)
@@ -480,6 +511,8 @@
              messageCounterHistoryDayLimit +
              ", pageSizeBytes=" +
              pageSizeBytes +
+             ", pageMaxCache=" +
+             pageMaxCache +
              ", redeliveryDelay=" +
              redeliveryDelay +
              ", redistributionDelay=" +

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/SoftValueHashMap.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -17,19 +17,22 @@
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * A SoftValueConcurrentHashMap
+ * A SoftValueHashMap
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
  *
  */
-public class SoftValueHashMap<K, V> implements Map<K, V>
+public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implements Map<K, V>
 {
    // The soft references that are already good.
    // too bad there's no way to override the queue method on ReferenceQueue, so I wouldn't need this
@@ -37,16 +40,42 @@
 
    private final Map<K, AggregatedSoftReference> mapDelegate = new HashMap<K, AggregatedSoftReference>();
 
+   private final AtomicLong nextId = new AtomicLong(0);
+
+   private int maxElements;
+
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
 
+   public static abstract interface ValueCache
+   {
+      public abstract boolean isLive();
+   }
+
    // Constructors --------------------------------------------------
 
+   public SoftValueHashMap(final int maxElements)
+   {
+      this.maxElements = maxElements;
+   }
+
    // Public --------------------------------------------------------
 
+   
+   public void setMaxElements(final int maxElements)
+   {
+      this.maxElements = maxElements;
+      checkCacheSize();
+   }
+   
+   public int getMaxEelements()
+   {
+      return this.maxElements;
+   }
+   
    /**
     * @return
     * @see java.util.Map#size()
@@ -109,6 +138,7 @@
       AggregatedSoftReference value = mapDelegate.get(key);
       if (value != null)
       {
+         value.used();
          return value.get();
       }
       else
@@ -127,6 +157,7 @@
    {
       processQueue();
       AggregatedSoftReference refPut = mapDelegate.put(key, createReference(key, value));
+      checkCacheSize();
       if (refPut != null)
       {
          return refPut.get();
@@ -137,6 +168,66 @@
       }
    }
 
+   private void checkCacheSize()
+   {
+      if (maxElements > 0 && mapDelegate.size() > maxElements)
+      {
+         TreeSet<AggregatedSoftReference> usedReferences = new TreeSet<AggregatedSoftReference>(new ComparatorAgregated());
+         
+         for (AggregatedSoftReference ref : mapDelegate.values())
+         {
+            V v = ref.get();
+            
+            if (v != null && !v.isLive())
+            {
+               usedReferences.add(ref);
+            }
+         }
+         
+         for (AggregatedSoftReference ref : usedReferences)
+         {
+            mapDelegate.remove(ref.key);
+
+            if (mapDelegate.size() <= maxElements)
+            {
+               break;
+            }
+         }
+      }
+   }
+
+   class ComparatorAgregated implements Comparator<AggregatedSoftReference>
+   {
+      public int compare(AggregatedSoftReference o1, AggregatedSoftReference o2)
+      {
+         long k = o1.used - o2.used;
+
+         if (k > 0)
+         {
+            return 1;
+         }
+         else if (k < 0)
+         {
+            return -1;
+         }
+         
+         k = o1.id - o2.id;
+         
+         if (k > 0)
+         {
+            return 1;
+         }
+         else if (k < 0) 
+         {
+            return -1;
+         }
+         else
+         {
+            return 0;
+         }
+      }
+   }
+
    /**
     * @param key
     * @return
@@ -222,7 +313,7 @@
          V value = pair.getValue().get();
          if (value != null)
          {
-            set.add(new EntryElement<K,V>(pair.getKey(), value));
+            set.add(new EntryElement<K, V>(pair.getKey(), value));
          }
       }
       return set;
@@ -262,6 +353,7 @@
       AggregatedSoftReference ref = null;
       while ((ref = (AggregatedSoftReference)this.refQueue.poll()) != null)
       {
+         System.out.println("Removing " + ref.key);
          mapDelegate.remove(ref.key);
       }
    }
@@ -278,6 +370,20 @@
    {
       final K key;
 
+      long id = nextId.incrementAndGet();
+
+      long used = 0;
+
+      public long getUsed()
+      {
+         return used;
+      }
+
+      public void used()
+      {
+         used++;
+      }
+
       public AggregatedSoftReference(final K key, final V referent)
       {
          super(referent, refQueue);

Modified: branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/config/ConfigurationTest-full-config.xml	2011-02-03 22:17:34 UTC (rev 10176)
@@ -202,6 +202,7 @@
          <redelivery-delay>1</redelivery-delay>
          <max-size-bytes>81781728121878</max-size-bytes>
          <page-size-bytes>81738173872337</page-size-bytes>
+         <page-max-cache-size>10</page-max-cache-size>
          <message-counter-history-day-limit>4</message-counter-history-day-limit>
       </address-setting>
       <address-setting match="a2">
@@ -210,6 +211,7 @@
          <redelivery-delay>5</redelivery-delay>
          <max-size-bytes>932489234928324</max-size-bytes>
          <page-size-bytes>7126716262626</page-size-bytes>
+         <page-max-cache-size>20</page-max-cache-size>
          <message-counter-history-day-limit>8</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -879,6 +879,7 @@
                                                           1024 * 1024,
                                                           1024 * 10,
                                                           5,
+                                                          5,
                                                           0,
                                                           false,
                                                           "PAGE");
@@ -945,6 +946,7 @@
                                                           100 * 1024,
                                                           10 * 1024,
                                                           5,
+                                                          5,
                                                           0,
                                                           false,
                                                           "PAGE");

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -480,6 +480,7 @@
       int deliveryAttempts = 1;
       long maxSizeBytes = 2;
       int pageSizeBytes = 3;
+      int pageMaxCacheSize = 7;
       long redeliveryDelay = 4;
       long redistributionDelay = 5;
       boolean sendToDLAOnNoRoute = true;
@@ -492,6 +493,7 @@
                                        deliveryAttempts,
                                        maxSizeBytes,
                                        pageSizeBytes,
+                                       pageMaxCacheSize,
                                        redeliveryDelay,
                                        redistributionDelay,
                                        sendToDLAOnNoRoute,
@@ -508,6 +510,7 @@
       assertEquals(lastValueQueue, info.isLastValueQueue());
       assertEquals(deliveryAttempts, info.getMaxDeliveryAttempts());
       assertEquals(maxSizeBytes, info.getMaxSizeBytes());
+      assertEquals(pageMaxCacheSize, info.getPageCacheMaxSize());
       assertEquals(pageSizeBytes, info.getPageSizeBytes());
       assertEquals(redeliveryDelay, info.getRedeliveryDelay());
       assertEquals(redistributionDelay, info.getRedistributionDelay());

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -512,6 +512,7 @@
                                         @Parameter(desc = "the delivery attempts", name = "deliveryAttempts") int deliveryAttempts,
                                         @Parameter(desc = "the max size in bytes", name = "maxSizeBytes") long maxSizeBytes,
                                         @Parameter(desc = "the page size in bytes", name = "pageSizeBytes") int pageSizeBytes,
+                                        int pageMaxCacheSize,
                                         @Parameter(desc = "the redelivery delay", name = "redeliveryDelay") long redeliveryDelay,
                                         @Parameter(desc = "the redistribution delay", name = "redistributionDelay") long redistributionDelay,
                                         @Parameter(desc = "do we send to the DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") boolean sendToDLAOnNoRoute,

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/PageCursorStressTest.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -90,7 +90,7 @@
    public void testReadCache() throws Exception
    {
 
-      final int NUM_MESSAGES = 1000;
+      final int NUM_MESSAGES = 100;
 
       int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
 
@@ -98,7 +98,8 @@
 
       PageCursorProviderImpl cursorProvider = new PageCursorProviderImpl(lookupPageStore(ADDRESS),
                                                                          server.getStorageManager(),
-                                                                         server.getExecutorFactory());
+                                                                         server.getExecutorFactory(),
+                                                                         5);
 
       for (int i = 0; i < numberOfPages; i++)
       {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -260,6 +260,7 @@
       assertEquals(1, conf.getAddressesSettings().get("a1").getRedeliveryDelay());
       assertEquals(81781728121878l, conf.getAddressesSettings().get("a1").getMaxSizeBytes());
       assertEquals(81738173872337l, conf.getAddressesSettings().get("a1").getPageSizeBytes());
+      assertEquals(10, conf.getAddressesSettings().get("a1").getPageCacheMaxSize());
       assertEquals(4, conf.getAddressesSettings().get("a1").getMessageCounterHistoryDayLimit());
 
       assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
@@ -267,6 +268,7 @@
       assertEquals(5, conf.getAddressesSettings().get("a2").getRedeliveryDelay());
       assertEquals(932489234928324l, conf.getAddressesSettings().get("a2").getMaxSizeBytes());
       assertEquals(7126716262626l, conf.getAddressesSettings().get("a2").getPageSizeBytes());
+      assertEquals(20, conf.getAddressesSettings().get("a2").getPageCacheMaxSize());
       assertEquals(8, conf.getAddressesSettings().get("a2").getMessageCounterHistoryDayLimit());
       
       

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java	2011-02-03 11:35:45 UTC (rev 10175)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/util/SoftValueMapTest.java	2011-02-03 22:17:34 UTC (rev 10176)
@@ -40,27 +40,111 @@
    {
       forceGC();
       long maxMemory = Runtime.getRuntime().maxMemory() - Runtime.getRuntime().freeMemory();
-      
+
       // each buffer will be 1/10th of the maxMemory
       int bufferSize = (int)(maxMemory / 100);
+
+      class Value implements SoftValueHashMap.ValueCache
+      {
+         byte[] payload;
+
+         Value(byte[] payload)
+         {
+            this.payload = payload;
+         }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.utils.SoftValueHashMap.ValueCache#isLive()
+          */
+         public boolean isLive()
+         {
+            return false;
+         }
+      }
+
+      SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<Long, Value>(100);
+
+      final int MAX_ELEMENTS = 1000;
+
+      for (long i = 0; i < MAX_ELEMENTS; i++)
+      {
+         softCache.put(i, new Value(new byte[bufferSize]));
+      }
+
+      assertTrue(softCache.size() < MAX_ELEMENTS);
       
-      SoftValueHashMap<Long, byte[]> softCache = new SoftValueHashMap<Long, byte[]>();
+      System.out.println("SoftCache.size " + softCache.size());
+
+      System.out.println("Soft cache has " + softCache.size() + " elements");
+   }
+
+
+   public void testEvictionsLeastUsed()
+   {
+      forceGC();
+
+      class Value implements SoftValueHashMap.ValueCache
+      {
+         byte[] payload;
+         
+         boolean live;
+
+         Value(byte[] payload)
+         {
+            this.payload = payload;
+         }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.utils.SoftValueHashMap.ValueCache#isLive()
+          */
+         public boolean isLive()
+         {
+            return live;
+         }
+         
+         public void setLive(boolean live)
+         {
+            this.live = live;
+         }
+      }
+
+      SoftValueHashMap<Long, Value> softCache = new SoftValueHashMap<Long, Value>(200);
       
-      final int MAX_ELEMENTS = 1000;
+      for (long i = 0 ; i < 100; i++)
+      {
+         Value v = new Value(new byte[1]);
+         v.setLive(true);
+         softCache.put(i, v);
+      }
       
-      for (long i = 0 ; i < MAX_ELEMENTS; i++)
+      for (long i = 100; i < 200; i++)
       {
-         softCache.put(i, new byte[bufferSize]);
+         Value v = new Value(new byte[1]);
+         softCache.put(i, v);
       }
       
+      assertNotNull(softCache.get(100l));
       
-      assertTrue(softCache.size() < MAX_ELEMENTS);
+      softCache.put(300l, new Value(new byte[1]));
       
+      // these are live, so they shouldn't go
+      
+      for (long i = 0; i < 100; i++)
+      {
+         assertNotNull(softCache.get(i));
+      }
+      
+      // this was accessed, so it shouldn't go
+      assertNotNull(softCache.get(100l));
+      
+      // this is the next one, so it should go
+      assertNull(softCache.get(101l));
+      
+      System.out.println("SoftCache.size " + softCache.size());
+
       System.out.println("Soft cache has " + softCache.size() + " elements");
    }
 
-   
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list