[hornetq-commits] JBoss hornetq SVN: r9795 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 19 17:28:16 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-19 17:28:15 -0400 (Tue, 19 Oct 2010)
New Revision: 9795

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
Implementing cleanup after closing a cursor

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-19 21:28:15 UTC (rev 9795)
@@ -29,8 +29,15 @@
 
    // Cursor query operations --------------------------------------
    
+   // To be called before the server is down
    void stop();
    
+   /** It will be 0 if non persistent cursor */
+   public long getId();
+   
+   // To be called when the cursor is closed for good. Most likely when the queue is deleted
+   void close() throws Exception;
+   
    Pair<PagePosition, PagedMessage> moveNext() throws Exception;
 
    void ack(PagePosition position) throws Exception;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-19 21:28:15 UTC (rev 9795)
@@ -66,6 +66,15 @@
 
    void scheduleCleanup();
 
+   /**
+    * @param pageCursorImpl
+    */
+   void close(PageCursor pageCursorImpl);
+   
+   // to be used on tests -------------------------------------------
+   
+   int getCacheSize();
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-19 21:28:15 UTC (rev 9795)
@@ -24,6 +24,7 @@
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.Pair;
@@ -107,6 +108,11 @@
 
    // Public --------------------------------------------------------
 
+   public PageCursorProvider getProvider()
+   {
+      return this.cursorProvider;
+   }
+   
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
@@ -196,10 +202,9 @@
     */
    public long getFirstPage()
    {
-      Long firstKey = consumedPages.firstKey();
-      if (firstKey == null)
+      if (consumedPages.isEmpty())
       {
-         return Long.MAX_VALUE;
+         return 0;
       }
       else
       {
@@ -221,7 +226,6 @@
     */
    public void reloadACK(final PagePosition position)
    {
-      System.out.println("reloading " + position);
       if (recoveredACK == null)
       {
          recoveredACK = new LinkedList<PagePosition>();
@@ -246,8 +250,95 @@
    {
       processACK(position);
    }
+   
+   
+   /**
+    * All the data associated with the cursor should go away here
+    */
+   public void close()  throws Exception
+   {
+      final long tx = store.generateUniqueID();
+      
+      final ArrayList<Exception> ex = new ArrayList<Exception>();
+      
+      final AtomicBoolean isPersistent = new AtomicBoolean(false);
+      
 
+      // We can't delete the records at the caller's thread
+      // because an executor may be holding the synchronized on PageCursorImpl
+      // what would lead to a dead lock
+      // so, we delete it inside the executor also
+      // and wait for the result
+      // The caller will be treating eventual IO exceptions and dispatching to the original thread's caller
+      executor.execute(new Runnable()
+      {
+
+         public void run()
+         {
+            try
+            {
+               synchronized (PageCursorImpl.this)
+               {
+                  for (PageCursorInfo cursor : consumedPages.values())
+                  {
+                     for (PagePosition info : cursor.acks)
+                     {
+                        if (info.getRecordID() != 0)
+                        {
+                           isPersistent.set(true);
+                           store.deleteCursorAcknowledgeTransactional(tx, info.getRecordID());
+                        }
+                     }
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               ex.add(e);
+               log.warn(e.getMessage(), e);
+            }
+         }
+      });
+       
+      Future future = new Future();
+      
+      executor.execute(future);
+      
+      while (!future.await(5000))
+      {
+         log.warn("Timeout on waiting cursor " + this + " to be closed");
+      }
+      
+      
+      if (isPersistent.get())
+      {
+         // Another reason to perform the commit at the main thread is because the OperationContext may only send the result to the client when
+         // the IO on commit is done
+         if (ex.size() == 0)
+         {
+            store.commit(tx);
+         }
+         else
+         {
+            store.rollback(tx);
+            throw ex.get(0);
+         }
+      }
+      
+      cursorProvider.close(this);
+   }
    
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageCursor#getId()
+    */
+   public long getId()
+   {
+      return cursorId;
+   }
+
+
+   
    public void processReload() throws Exception
    {
       if (recoveredACK != null)
@@ -317,7 +408,10 @@
    {
       Future future = new Future();
       executor.execute(future);
-      future.await(1000);
+      while (!future.await(1000))
+      {
+         log.warn("Waiting page cursor to finish executors - " + this);
+      }
    }
 
    public void printDebug()
@@ -340,7 +434,6 @@
       if (pageInfo == null)
       {
          PageCache cache = cursorProvider.getPageCache(pos);
-         System.out.println("Number of Messages = " + cache.getNumberOfMessages());
          pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages(), cache);
          consumedPages.put(pos.getPageNr(), pageInfo);
       }
@@ -451,7 +544,7 @@
             {
                if (entry.getKey() == lastAckedPosition.getPageNr())
                {
-                  System.out.println("We can't clear page " + entry.getKey() + " now since it's the current page");
+                  trace("We can't clear page " + entry.getKey() + " now since it's the current page");
                }
                else
                {
@@ -499,7 +592,7 @@
                         {
                            PageCursorImpl.trace("Removing page " + completePage.getPageId());
                         }
-                        System.out.println("Removing page " + completePage.getPageId());
+                        trace("Removing page " + completePage.getPageId());
                         consumedPages.remove(completePage.getPageId());
                      }
                   }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-19 21:28:15 UTC (rev 9795)
@@ -33,6 +33,7 @@
 import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.ConcurrentSet;
 import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
 import org.hornetq.utils.SoftValueHashMap;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
 
@@ -67,7 +68,7 @@
    private SoftValueHashMap<Long, PageCache> softCache = new SoftValueHashMap<Long, PageCache>();
 
    private ConcurrentMap<Long, PageCursor> activeCursors = new ConcurrentHashMap<Long, PageCursor>();
-   
+
    private ConcurrentSet<PageCursor> nonPersistentCursors = new ConcurrentHashSet<PageCursor>();
 
    // Static --------------------------------------------------------
@@ -83,7 +84,7 @@
       this.storageManager = storageManager;
       this.executorFactory = executorFactory;
       this.executor = executorFactory.getExecutor();
-   }
+    }
 
    // Public --------------------------------------------------------
 
@@ -251,14 +252,42 @@
          cursor.stop();
       }
 
-      activeCursors.clear();
+      for (PageCursor cursor : nonPersistentCursors)
+      {
+         cursor.stop();
+      }
+
+      Future future = new Future();
+
+      executor.execute(future);
+
+      while (!future.await(10000))
+      {
+         log.warn("Waiting cursor provider " + this + " to finish executors");
+      }
+
    }
 
+   public void close(PageCursor cursor)
+   {
+      if (cursor.getId() != 0)
+      {
+         activeCursors.remove(cursor.getId());
+      }
+      else
+      {
+         nonPersistentCursors.remove(cursor);
+      }
+
+      scheduleCleanup();
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#scheduleCleanup()
     */
    public void scheduleCleanup()
    {
+
       executor.execute(new Runnable()
       {
          public void run()
@@ -274,17 +303,18 @@
 
       try
       {
-         System.out.println("MinPage = " + minPage + " firstPage = "  + pagingStore.getFirstPage());
          for (long i = pagingStore.getFirstPage(); i < minPage; i++)
          {
             Page page = pagingStore.depage();
-            System.out.println("Deleting files associated with page " + page);
-            page.delete();
+            if (page != null)
+            {
+               page.delete();
+            }
          }
       }
-      catch (Exception e)
+      catch (Exception ex)
       {
-         log.warn("Couldn't complete cleanup on paging", e);
+         log.warn("Couldn't complete cleanup on paging", ex);
       }
    }
 
@@ -316,15 +346,13 @@
          cursorList.addAll(activeCursors.values());
          cursorList.addAll(nonPersistentCursors);
       }
-      
+
       if (cursorList.size() == 0)
       {
          return 0l;
       }
 
       long minPage = Long.MAX_VALUE;
-      
-      System.out.println("CursorList : " + cursorList.size());
 
       for (PageCursor cursor : cursorList)
       {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-19 21:28:15 UTC (rev 9795)
@@ -56,6 +56,7 @@
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.Future;
 
 /**
  * 
@@ -210,6 +211,11 @@
    }
 
    // Public --------------------------------------------------------
+   
+   public String toString()
+   {
+      return "PagingStoreImpl(" + this.address + ")";
+   }
 
    // PagingStore implementation ------------------------------------
    
@@ -393,30 +399,26 @@
    {
       if (running)
       {
+         
+         cursorProvider.stop();
+
          running = false;
 
-         final CountDownLatch latch = new CountDownLatch(1);
+         Future future = new Future();
 
-         executor.execute(new Runnable()
-         {
-            public void run()
-            {
-               latch.countDown();
-            }
-         });
+         executor.execute(future);
 
-         if (!latch.await(60, TimeUnit.SECONDS))
+         if (!future.await(60000))
          {
             PagingStoreImpl.log.warn("Timed out on waiting PagingStore " + address + " to shutdown");
          }
+         
 
          if (currentPage != null)
          {
             currentPage.close();
             currentPage = null;
          }
-         
-         cursorProvider.stop();
       }
    }
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-16 04:00:15 UTC (rev 9794)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-19 21:28:15 UTC (rev 9795)
@@ -470,6 +470,51 @@
       // Validate the pages are being cleared (with multiple cursors)
    }
    
+   
+   public void testCloseNonPersistentConsumer() throws Exception
+   {
+
+      final int NUM_MESSAGES = 100;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
+      PageCursorProvider cursorProvider = lookupPageStore(ADDRESS).getCursorProvier();
+      
+      PageCursor cursor = cursorProvider.createNonPersistentCursor();
+      PageCursorImpl cursor2 = (PageCursorImpl)cursorProvider.createNonPersistentCursor();
+      
+      Pair<PagePosition, PagedMessage> msg;
+      
+      int key = 0;
+      while ((msg = cursor.moveNext()) != null)
+      {
+         assertEquals(key++, msg.b.getMessage().getIntProperty("key").intValue());
+         cursor.ack(msg.a);
+      }
+      assertEquals(NUM_MESSAGES, key);
+      
+      
+      forceGC();
+      
+      assertTrue(cursorProvider.getCacheSize() < numberOfPages);
+      
+      for (int i = 0 ; i < 10; i++)
+      {
+         msg = cursor2.moveNext();
+         assertEquals(i, msg.b.getMessage().getIntProperty("key").intValue());
+      }
+        
+      assertSame(cursor2.getProvider(), cursorProvider);
+
+      cursor2.close();
+      
+      server.stop();
+
+   }
+  
+   
    public void testLeavePageStateAndRestart() throws Exception
    {
       // Validate the cursor are working fine when all the pages are gone, and then paging being restarted   



More information about the hornetq-commits mailing list