[hornetq-commits] JBoss hornetq SVN: r11326 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq: core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 12 09:33:49 EDT 2011


Author: borges
Date: 2011-09-12 09:33:49 -0400 (Mon, 12 Sep 2011)
New Revision: 11326

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java
Log:
HORNETQ-720 Fix dead-lock on PagingStore shutdown, remove dead code, reduce visibility

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java	2011-09-12 13:33:04 UTC (rev 11325)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java	2011-09-12 13:33:49 UTC (rev 11326)
@@ -95,12 +95,10 @@
     */
    Page depage() throws Exception;
 
-
    void forceAnotherPage() throws Exception;
 
    Page getCurrentPage();
 
-
    /** @return true if paging was started, or false if paging was already started before this call */
    boolean startPaging() throws Exception;
 
@@ -110,20 +108,19 @@
 
    void executeRunnableWhenMemoryAvailable(Runnable runnable);
 
-   /** This method will hold and producer, but it wait operations to finish before locking (write lock) */
-   void lock();
+   /**
+    * Write lock the PagingStore.
+    * @param timeout milliseconds to wait for the lock. If value is {@literal -1} then wait
+    *           indefinitely.
+    * @return {@code true} if the lock was obtained, {@code false} otherwise
+    */
+   boolean lock(long timeout);
 
    /**
-    *
-    * Call this method using the same thread used by the last call of {@link PagingStore#lock()}
-    *
+    * Call this method using the same thread used by the last call of {@link PagingStore#lock()}.
     */
     void unlock();
 
-    /** This is used mostly by tests.
-     *  We will wait any pending runnable to finish its execution */
-    void flushExecutors();
-
    /**
     * Files to synchronize with backup.
     * @return

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-09-12 13:33:04 UTC (rev 11325)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-09-12 13:33:49 UTC (rev 11326)
@@ -36,7 +36,7 @@
 
 /**
  * A PageProviderIMpl
- * 
+ *
  * TODO: this may be moved entirely into PagingStore as there's an one-to-one relationship here
  *       However I want to keep this isolated as much as possible during development
  *
@@ -302,6 +302,12 @@
                storageManager.clearContext();
             }
          }
+
+         @Override
+         public String toString()
+         {
+            return "PageCursorProvider:scheduleCleanup()";
+         }
       });
    }
 
@@ -309,8 +315,15 @@
    {
       ArrayList<Page> depagedPages = new ArrayList<Page>();
 
-      pagingStore.lock();
-
+      while (true)
+      {
+         if (pagingStore.lock(100))
+         {
+            break;
+         }
+         if (!pagingStore.isStarted())
+            return;
+      }
       synchronized (this)
       {
          try
@@ -342,7 +355,6 @@
                      break;
                   }
                }
-
                if (complete)
                {
 
@@ -378,6 +390,7 @@
             if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 &&
                 pagingStore.getCurrentPage().getNumberOfMessages() == 0)
             {
+
                pagingStore.stopPaging();
             }
             else

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-09-12 13:33:04 UTC (rev 11325)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-09-12 13:33:49 UTC (rev 11326)
@@ -57,12 +57,12 @@
 /**
  * A PageCursorImpl
  *
- * A page cursor will always store its 
+ * A page cursor will always store its
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
- * 
+ *
  */
-public class PageSubscriptionImpl implements PageSubscription
+class PageSubscriptionImpl implements PageSubscription
 {
    // Constants -----------------------------------------------------
    private static final Logger log = Logger.getLogger(PageSubscriptionImpl.class);
@@ -99,7 +99,7 @@
    private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
 
    private final PageSubscriptionCounter counter;
-   
+
    private final Executor executor;
 
    private final AtomicLong deliveredCount = new AtomicLong(0);
@@ -210,7 +210,7 @@
       }
    }
 
-   /** 
+   /**
     * It will cleanup all the records for completed pages
     * */
    public void cleanupEntries() throws Exception
@@ -224,7 +224,7 @@
       // First get the completed pages using a lock
       synchronized (this)
       {
-         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet()) 
+         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
          {
             PageCursorInfo info = entry.getValue();
             if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
@@ -363,7 +363,7 @@
    }
 
    /**
-    * 
+    *
     */
    private synchronized PagePosition getStartPosition()
    {
@@ -395,7 +395,7 @@
                   }
                }
 
-               if (isTrace) 
+               if (isTrace)
                {
                   trace("Returning initial position " + retValue);
                }
@@ -520,7 +520,7 @@
       }
    }
 
-   /** 
+   /**
     * Theres no need to synchronize this method as it's only called from journal load on startup
     */
    public void reloadACK(final PagePosition position)
@@ -550,7 +550,7 @@
       processACK(position);
    }
 
-   
+
    public void lateDeliveryRollback(PagePosition position)
    {
       PageCursorInfo cursorInfo = processACK(position);
@@ -574,9 +574,9 @@
       final long tx = store.generateUniqueID();
       try
       {
-   
+
          boolean isPersistent = false;
-   
+
          synchronized (PageSubscriptionImpl.this)
          {
             for (PageCursorInfo cursor : consumedPages.values())
@@ -591,12 +591,12 @@
                }
             }
          }
-    
+
          if (isPersistent)
          {
             store.commit(tx);
          }
-   
+
          cursorProvider.close(this);
       }
       catch (Exception e)
@@ -678,7 +678,7 @@
       printDebug(toString());
    }
 
-   public void printDebug(final String msg)
+   private void printDebug(final String msg)
    {
       System.out.println("Debug information on PageCurorImpl- " + msg);
       for (PageCursorInfo info : consumedPages.values())
@@ -752,7 +752,7 @@
             {
                log.trace("Scheduling cleanup on pageSubscription for address = " + pageStore.getAddress() + " queue = " + this.getQueue().getName());
             }
-            
+
             // there's a different page being acked, we will do the check right away
             if (autoCleanup)
             {
@@ -764,7 +764,7 @@
       PageCursorInfo info = getPageInfo(pos);
 
       info.addACK(pos);
-      
+
       return info;
    }
 
@@ -821,7 +821,7 @@
 
    // Inner classes -------------------------------------------------
 
-   /** 
+   /**
     * This will hold information about the pending ACKs towards a page.
     * This instance will be released as soon as the entire page is consumed, releasing the memory at that point
     * The ref counts are increased also when a message is ignored for any reason.
@@ -839,13 +839,13 @@
 
       private WeakReference<PageCache> cache;
 
-      private Set<PagePosition> removedReferences = new ConcurrentHashSet<PagePosition>();
+      private final Set<PagePosition> removedReferences = new ConcurrentHashSet<PagePosition>();
 
       // The page was live at the time of the creation
       private final boolean wasLive;
 
       // There's a pending TX to add elements on this page
-      private AtomicInteger pendingTX = new AtomicInteger(0);
+      private final AtomicInteger pendingTX = new AtomicInteger(0);
 
       // There's a pending delete on the async IO pipe
       // We're holding this object to avoid delete the pages before the IO is complete,
@@ -946,7 +946,7 @@
       }
 
       /**
-       * 
+       *
        */
       protected void checkDone()
       {
@@ -1021,6 +1021,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
        */
+      @Override
       public List<MessageReference> getRelatedMessageReferences()
       {
          return Collections.emptyList();
@@ -1153,9 +1154,9 @@
                {
                   ignored = true;
                }
-               
+
                PageCursorInfo info = getPageInfo(message.getPosition(), false);
-               
+
                if (info != null && info.isRemoved(message.getPosition()))
                {
                   continue;
@@ -1191,7 +1192,7 @@
                   // Say you have a Browser that will only read the files... there's no need to control PageCursors is
                   // nothing
                   // is being changed. That's why the false is passed as a parameter here
-                 
+
                   if (info != null && info.isRemoved(message.getPosition()))
                   {
                      valid = false;
@@ -1228,7 +1229,7 @@
          }
       }
 
-      /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. 
+      /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well.
        *  It would be a rare race condition but I would prefer avoiding that scenario */
       public synchronized boolean hasNext()
       {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java	2011-09-12 13:33:04 UTC (rev 11325)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java	2011-09-12 13:33:49 UTC (rev 11326)
@@ -302,7 +302,7 @@
       syncLock.writeLock().lock();
       for (PagingStore store : stores.values())
       {
-         store.lock();
+         store.lock(-1);
       }
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-12 13:33:04 UTC (rev 11325)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-12 13:33:49 UTC (rev 11326)
@@ -24,6 +24,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -224,9 +225,21 @@
 
    // PagingStore implementation ------------------------------------
 
-   public void lock()
+   public boolean lock(long timeout)
    {
-      lock.writeLock().lock();
+      if (timeout == -1)
+      {
+         lock.writeLock().lock();
+         return true;
+      }
+      try
+      {
+         return lock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);
+      }
+      catch (InterruptedException e)
+      {
+         return false;
+      }
    }
 
    public void unlock()
@@ -373,7 +386,7 @@
 
    public synchronized void stop() throws Exception
    {
-      lock();
+      lock(-1);
       try
       {
          if (running)
@@ -397,6 +410,7 @@
       }
    }
 
+   /** Wait any pending runnable to finish its execution. */
    public void flushExecutors()
    {
       cursorProvider.flushExecutors();
@@ -571,7 +585,7 @@
 
    public Page createPage(final int pageNumber) throws Exception
    {
-      lock();
+      lock(-1);
       try
       {
          String fileName = createFileName(pageNumber);
@@ -1131,7 +1145,7 @@
    @Override
    public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception
    {
-      lock();
+      lock(-1);
       try
       {
          for (Integer id : pageIds)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java	2011-09-12 13:33:04 UTC (rev 11325)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java	2011-09-12 13:33:49 UTC (rev 11326)
@@ -23,9 +23,9 @@
  *
  * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
+ *
  * @version <tt>$Revision$</tt>
- * 
+ *
  */
 public final class OrderedExecutorFactory implements ExecutorFactory
 {
@@ -125,5 +125,10 @@
             }
          }
       }
+
+      public String toString()
+      {
+         return "OrderedExecutor(running=" + running + ", tasks=" + tasks + ")";
+      }
    }
 }



More information about the hornetq-commits mailing list