[hornetq-commits] JBoss hornetq SVN: r9754 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 5 18:18:28 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-05 18:18:27 -0400 (Tue, 05 Oct 2010)
New Revision: 9754

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
First implementation of the redelivery and ack with reload of the cursors

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -82,5 +82,5 @@
 
    void deletePageStore(SimpleString storeName) throws Exception;
    
-   void processReload();
+   void processReload() throws Exception;
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -64,7 +64,7 @@
    
    PageCursorProvider getCursorProvier();
    
-   void processReload();
+   void processReload() throws Exception;
 
    /**
     * @return false if a thread was already started, or if not in page mode

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -48,7 +48,7 @@
     */
    void reloadPreparedACK(Transaction tx, PagePosition position);
    
-   void processReload();
+   void processReload() throws Exception;
 
    /**
     * To be used on redeliveries

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -57,9 +57,13 @@
    // PageCursor recoverCursor(PagePosition position);
 
    Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
+   
+   ServerMessage getMessage(PagePosition pos) throws Exception;
 
-   void processReload();
+   void processReload() throws Exception;
 
+   void stop();
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagePosition.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -37,5 +37,8 @@
    PagePosition nextMessage();
 
    PagePosition nextPage();
+   
+   /** This will just test if the current position is the immediate next to the parameter position */
+   boolean isRightAfter(PagePosition previous);
 
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -14,8 +14,10 @@
 package org.hornetq.core.paging.cursor.impl;
 
 import java.util.Collections;
+import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.paging.PagingStore;
@@ -23,6 +25,7 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 
@@ -49,9 +52,12 @@
    private final PageCursorProvider cursorProvider;
 
    private volatile PagePosition lastPosition;
-   
+
    private List<PagePosition> recoveredACK;
 
+   // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
+   private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -74,11 +80,23 @@
     */
    public synchronized Pair<PagePosition, ServerMessage> moveNext() throws Exception
    {
+      PagePosition redeliveryPos = null;
+
+      // Redeliveries will take precedence
+      if ((redeliveryPos = redeliveries.poll()) != null)
+      {
+         return new Pair<PagePosition, ServerMessage>(redeliveryPos, cursorProvider.getMessage(redeliveryPos));
+      }
+
       if (lastPosition == null)
       {
-         lastPosition = recoverLastPosition();
+         // it will start at the first available page
+         long firstPage = pageStore.getFirstPage();
+         lastPosition = new PagePositionImpl(firstPage, -1);
       }
 
+      boolean match = false;
+
       Pair<PagePosition, ServerMessage> message = null;
       do
       {
@@ -87,8 +105,14 @@
          {
             lastPosition = message.a;
          }
+         match = match(message.b);
+
+         if (!match)
+         {
+            ignored(message.a);
+         }
       }
-      while (message != null && !match(message.b));
+      while (message != null && !match);
 
       return message;
    }
@@ -111,12 +135,10 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public void redeliver(final PagePosition position)
+   public synchronized void redeliver(final PagePosition position)
    {
-      // TODO Auto-generated method stub
-
+      this.redeliveries.add(position);
    }
-   
 
    /** 
     * Theres no need to synchronize this method as it's only called from journal load on startup
@@ -136,11 +158,49 @@
       installTXCallback(tx, position);
    }
 
-   public void processReload()
+   public void processReload() throws Exception
    {
       if (this.recoveredACK != null)
       {
+         System.out.println("********** processing reload!!!!!!!");
          Collections.sort(recoveredACK);
+
+         PagePosition previousPos = null;
+         for (PagePosition pos : recoveredACK)
+         {
+            lastPosition = pos;
+            if (previousPos != null)
+            {
+               if (!previousPos.isRightAfter(previousPos))
+               {
+                  PagePosition tmpPos = previousPos;
+                  // looking for holes on the ack list for redelivery
+                  while (true)
+                  {
+                     Pair<PagePosition, ServerMessage> msgCheck = cursorProvider.getAfter(tmpPos);
+                     // end of the hole, we can finish processing here
+                     if (msgCheck == null || msgCheck.a.equals(pos))
+                     {
+                        break;
+                     }
+                     else
+                     {
+                        if (match(msgCheck.b))
+                        {
+                           redeliver(msgCheck.a);
+                        }
+                     }
+                     tmpPos = msgCheck.a;
+                  }
+               }
+            }
+
+            previousPos = pos;
+            System.out.println("pos: " + pos);
+         }
+
+         recoveredACK.clear();
+         recoveredACK = null;
       }
    }
 
@@ -153,11 +213,14 @@
       // To be used with expressions
       return true;
    }
-   
-   
 
    // Private -------------------------------------------------------
-   
+
+   private void ignored(final PagePosition message)
+   {
+      // TODO: Update reference counts
+   }
+
    /**
     * @param committedACK
     */
@@ -167,27 +230,18 @@
       {
          recoveredACK = new LinkedList<PagePosition>();
       }
-      
+
       recoveredACK.add(committedACK);
    }
 
-
-   private PagePosition recoverLastPosition()
-   {
-      long firstPage = pageStore.getFirstPage();
-      return new PagePositionImpl(firstPage, -1);
-   }
-   
-
    /**
     * @param tx
     * @param position
     */
    private void installTXCallback(Transaction tx, PagePosition position)
    {
-    }
+   }
 
-
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -78,7 +78,12 @@
       PageCursor activeCursor = activeCursors.get(cursorID);
       if (activeCursor == null)
       {
-         activeCursor = activeCursors.putIfAbsent(cursorID, new PageCursorImpl(this, pagingStore, storageManager, cursorID));
+         activeCursor = new PageCursorImpl(this, pagingStore, storageManager, cursorID);
+         PageCursor previousValue = activeCursors.putIfAbsent(cursorID, activeCursor);
+         if (previousValue != null)
+         {
+            activeCursor = previousValue;
+         }
       }
       
       return activeCursor;
@@ -118,6 +123,19 @@
       
       return new Pair<PagePosition, ServerMessage>(retPos, cache.getMessage(retPos.getMessageNr()));
    }
+   
+   public ServerMessage getMessage(final PagePosition pos) throws Exception
+   {
+      PageCache cache = getPageCache(pos.getPageNr());
+      
+      if (pos.getMessageNr() >= cache.getNumberOfMessages())
+      {
+         // sanity check, this should never happen unless there's a bug
+         throw new IllegalStateException("Invalid messageNumber passed = " + pos);
+      }
+      
+      return cache.getMessage(pos.getMessageNr());
+   }
 
    public PageCache getPageCache(final long pageId) throws Exception
    {
@@ -181,13 +199,18 @@
       return softCache.size();
    }
 
-   public void processReload()
+   public void processReload() throws Exception
    {
       for (PageCursor cursor : this.activeCursors.values())
       {
          cursor.processReload();
       }
    }
+   
+   public void stop()
+   {
+      activeCursors.clear();
+   }
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -82,6 +82,11 @@
    {
       return messageNr;
    }
+   
+   public boolean isRightAfter(final PagePosition previous)
+   {
+      return this.pageNr == previous.getPageNr() && this.messageNr == previous.getMessageNr() + 1;
+   }
 
    /* (non-Javadoc)
     * @see java.lang.Comparable#compareTo(java.lang.Object)
@@ -158,4 +163,12 @@
       return true;
    }
    
+   @Override
+   public String toString()
+   {
+      return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr + ", recordID=" + recordID + "]";
+   }
+
+
+   
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -229,7 +229,7 @@
       }
    }
    
-   public void processReload()
+   public void processReload() throws Exception
    {
       for (PagingStore store: stores.values())
       {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -317,6 +317,13 @@
 
    public boolean startDepaging()
    {
+      
+      // Disabled for now
+      
+      return false;
+      
+      
+      /*
       if (!running)
       {
          return false;
@@ -353,11 +360,11 @@
       finally
       {
          currentPageLock.readLock().unlock();
-      }
+      } */
    }
    
 
-   public void processReload()
+   public void processReload() throws Exception
    {
       cursorProvider.processReload();
    }
@@ -396,6 +403,8 @@
             currentPage.close();
             currentPage = null;
          }
+         
+         cursorProvider.stop();
       }
    }
 
@@ -1218,7 +1227,7 @@
 
    // Inner classes -------------------------------------------------
 
-   private class DepageRunnable implements Runnable
+/*   private class DepageRunnable implements Runnable
    {
       private final Executor followingExecutor;
 
@@ -1252,5 +1261,5 @@
             PagingStoreImpl.log.error(e, e);
          }
       }
-   }
+   } */
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -1114,7 +1114,7 @@
          deploymentManager.start();
       }
 
-      pagingManager.reloadStores();
+      pagingManager.processReload();
       
       pagingManager.resumeDepages();
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-05 17:33:33 UTC (rev 9753)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-10-05 22:18:27 UTC (rev 9754)
@@ -23,10 +23,12 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
+import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -50,6 +52,8 @@
    private SimpleString ADDRESS = new SimpleString("test-add");
 
    private HornetQServer server;
+   
+   private Queue queue;
 
    private static final int PAGE_MAX = -1;
 
@@ -135,6 +139,92 @@
    }
    
    
+   public void testRestart() throws Exception
+   {
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 100 * 1024);
+      
+      System.out.println("Number of pages = " + numberOfPages);
+      
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+      
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      System.out.println("Cursor: " + cursor);
+      for (int i = 0 ; i < 500 ; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         cursor.ack(msg.a);
+      }
+      
+      server.stop();
+      
+      server.start();
+      
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      for (int i = 500; i < NUM_MESSAGES; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         cursor.ack(msg.a);
+      }
+      
+   }
+   
+   
+   public void testRestartWithHoleOnAck() throws Exception
+   {
+
+      final int NUM_MESSAGES = 1000;
+
+      int numberOfPages = addMessages(NUM_MESSAGES, 10 * 1024);
+      
+      System.out.println("Number of pages = " + numberOfPages);
+      
+      PageCursorProvider cursorProvider = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier();
+      System.out.println("cursorProvider = " + cursorProvider);
+      
+      PageCursor cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      System.out.println("Cursor: " + cursor);
+      for (int i = 0 ; i < 100 ; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         if (i < 10 || i > 20)
+         {
+            cursor.ack(msg.a);
+         }
+      }
+      
+      server.stop();
+      
+      server.start();
+      
+      cursor = this.server.getPagingManager().getPageStore(ADDRESS).getCursorProvier().getCursor(queue.getID());
+      
+      for (int i = 10; i <= 20; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         cursor.ack(msg.a);
+      }
+    
+      
+      for (int i = 100; i < NUM_MESSAGES; i++)
+      {
+         Pair<PagePosition, ServerMessage> msg =  cursor.moveNext();
+         assertEquals(i, msg.b.getIntProperty("key").intValue());
+         cursor.ack(msg.a);
+      }
+      
+   }
+   
+   
    public void testRollbackScenarios() throws Exception
    {
       
@@ -216,8 +306,10 @@
                             new HashMap<String, AddressSettings>());
 
       server.start();
+      
+      queue = server.createQueue(ADDRESS, ADDRESS, null, true, false);
 
-      createQueue(ADDRESS.toString(), ADDRESS.toString());
+      //createQueue(ADDRESS.toString(), ADDRESS.toString());
    }
 
    protected void tearDown() throws Exception



More information about the hornetq-commits mailing list