[hornetq-commits] JBoss hornetq SVN: r12309 - trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 16 11:23:42 EDT 2012


Author: borges
Date: 2012-03-16 11:23:41 -0400 (Fri, 16 Mar 2012)
New Revision: 12309

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
HORNETQ-720 Avoid dead-locks on isPaging(), and make it faster

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2012-03-16 15:23:31 UTC (rev 12308)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2012-03-16 15:23:41 UTC (rev 12309)
@@ -54,15 +54,14 @@
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.utils.Future;
 
 /**
- * 
+ *
  * @see PagingStore
- * 
+ *
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -116,6 +115,7 @@
 
    private volatile Page currentPage;
 
+    private final Object pagingGuard = new Object();
    private volatile boolean paging = false;
 
    private final PageCursorProvider cursorProvider;
@@ -208,7 +208,7 @@
       pageSize = addressSettings.getPageSizeBytes();
 
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
-      
+
       if (cursorProvider != null)
       {
          cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
@@ -286,7 +286,7 @@
    {
       return pageSize;
    }
-   
+
    public String getFolder()
    {
       SequentialFileFactory factoryUsed = this.fileFactory;
@@ -302,28 +302,19 @@
 
    public boolean isPaging()
    {
-      lock.readLock().lock();
-
-      try
+      synchronized (pagingGuard)
       {
          if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
          {
             return false;
          }
-         else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
+         if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
          {
             return isFull();
          }
-         else
-         {
-            return paging;
-         }
-      }
-      finally
-      {
-         lock.readLock().unlock();
-      }
-   }
+         return paging;
+        }
+    }
 
    public int getNumberOfPages()
    {
@@ -527,7 +518,10 @@
       lock.writeLock().lock();
       try
       {
-         paging = false;
+         synchronized (pagingGuard)
+         {
+            paging = false;
+         }
       }
       finally
       {
@@ -542,18 +536,13 @@
          return false;
       }
 
-      lock.readLock().lock();
-      try
+      synchronized (pagingGuard)
       {
          if (paging)
          {
             return false;
          }
       }
-      finally
-      {
-         lock.readLock().unlock();
-      }
 
       // if the first check failed, we do it again under a global currentPageLock
       // (writeLock) this time
@@ -561,6 +550,8 @@
 
       try
       {
+         synchronized (pagingGuard)
+         {
          if (paging)
          {
             return false;
@@ -584,6 +575,7 @@
          paging = true;
 
          return true;
+         }
       }
       finally
       {
@@ -595,7 +587,7 @@
    {
       return currentPage;
    }
-   
+
    public boolean checkPageFileExists(final int pageNumber)
    {
       String fileName = createFileName(pageNumber);
@@ -827,9 +819,7 @@
 
    }
 
-   private
-            boolean
-            page(ServerMessage message, final RoutingContext ctx, RouteContextList listCtx, final boolean sync)
+    private boolean page(ServerMessage message, final RoutingContext ctx, RouteContextList listCtx, final boolean sync)
                                                                                                                throws Exception
    {
       if (!running)
@@ -849,7 +839,7 @@
 
                PagingStoreImpl.log.warn("Messages are being dropped on address " + getStoreName());
             }
-            
+
             if (log.isDebugEnabled())
             {
                log.debug("Message " + message + " beig dropped for fullAddressPolicy==DROP");
@@ -869,20 +859,13 @@
       }
 
       // We need to ensure a read lock, as depage could change the paging state
-      lock.readLock().lock();
+        synchronized (pagingGuard) {
 
-      try
-      {
          // First check done concurrently, to avoid synchronization and increase throughput
-         if (!paging)
-         {
+            if (!paging) {
             return false;
          }
-      }
-      finally
-      {
-         lock.readLock().unlock();
-      }
+        }
 
       Transaction tx = ctx.getTransaction();
 
@@ -890,9 +873,12 @@
 
       try
       {
-         if (!paging)
+         synchronized (pagingGuard)
          {
-            return false;
+            if (!paging)
+            {
+               return false;
+            }
          }
 
          if (!message.isDurable())
@@ -904,7 +890,7 @@
 
 
          PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
-         
+
          if (message.isLargeMessage())
          {
             ((LargeServerMessage)message).setPaged();
@@ -918,16 +904,16 @@
             openNewPage();
             currentPageSize.addAndGet(bytesToWrite);
          }
- 
+
          currentPage.write(pagedMessage);
 
          if (isTrace)
          {
-            log.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() + 
+            log.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() +
                       " pageId=" + currentPage.getPageId());
          }
-         
-        
+
+
          if (tx != null)
          {
             installPageTransaction(tx, listCtx);
@@ -990,15 +976,15 @@
    private static class FinishPageMessageOperation implements TransactionOperation
    {
       public final PageTransactionInfo pageTransaction;
-      
+
       private final StorageManager storageManager;
-      
+
       private final PagingManager pagingManager;
-      
+
       private final Set<PagingStore> usedStores = new HashSet<PagingStore>();
 
       private boolean stored = false;
-      
+
       public void addStore(PagingStore store)
       {
          this.usedStores.add(store);
@@ -1123,9 +1109,9 @@
    }
 
    /**
-    * 
+    *
     * Note: Decimalformat is not thread safe, Use synchronization before calling this method
-    * 
+    *
     * @param pageID
     * @return
     */
@@ -1144,6 +1130,7 @@
    {
       return maxSize > 0 && getAddressSize() > maxSize;
    }
+
    @Override
    public Collection<Integer> getCurrentIds() throws Exception
    {



More information about the hornetq-commits mailing list