[hornetq-commits] JBoss hornetq SVN: r9753 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 5 13:33:34 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-05 13:33:33 -0400 (Tue, 05 Oct 2010)
New Revision: 9753

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
Reload Cursors

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingManager.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -81,4 +81,6 @@
    SimpleString[] getStoreNames();
 
    void deletePageStore(SimpleString storeName) throws Exception;
+   
+   void processReload();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -63,6 +63,8 @@
    Page createPage(final int page) throws Exception;
    
    PageCursorProvider getCursorProvier();
+   
+   void processReload();
 
    /**
     * @return false if a thread was already started, or if not in page mode

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursor.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -15,6 +15,7 @@
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
 
 /**
  * A PageCursor
@@ -26,28 +27,32 @@
 public interface PageCursor
 {
 
+   // Cursor query operations --------------------------------------
+   
    Pair<PagePosition, ServerMessage> moveNext() throws Exception;
 
-   PagePosition getFirstPosition();
-
    void ack(PagePosition position) throws Exception;
 
-   void ackTx(long tx, PagePosition position) throws Exception;
+   void ackTx(Transaction tx, PagePosition position) throws Exception;
    
+   // Reload operations
+   
    /**
     * @param position
     */
-   void recoverACK(PagePosition position);
+   void reloadACK(PagePosition position);
    
    /**
     * To be used to avoid a redelivery of a prepared ACK after load
     * @param position
     */
-   void recoverPreparedACK(PagePosition position);
+   void reloadPreparedACK(Transaction tx, PagePosition position);
+   
+   void processReload();
 
    /**
     * To be used on redeliveries
     * @param position
     */
-   void returnElement(PagePosition position);
+   void redeliver(PagePosition position);
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -58,6 +58,8 @@
 
    Pair<PagePosition, ServerMessage> getAfter(PagePosition pos) throws Exception;
 
+   void processReload();
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCacheImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -58,7 +58,7 @@
     */
    public Page getPage()
    {
-      return null;
+      return page;
    }
 
    /* (non-Javadoc)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -13,6 +13,10 @@
 
 package org.hornetq.core.paging.cursor.impl;
 
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCursor;
@@ -20,6 +24,7 @@
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
 
 /**
  * A PageCursorImpl
@@ -44,6 +49,8 @@
    private final PageCursorProvider cursorProvider;
 
    private volatile PagePosition lastPosition;
+   
+   private List<PagePosition> recoveredACK;
 
    // Static --------------------------------------------------------
 
@@ -94,64 +101,93 @@
       store.storeCursorAcknowledge(cursorId, position);
    }
 
-   public void ackTx(final long tx, final PagePosition position) throws Exception
+   public void ackTx(final Transaction tx, final PagePosition position) throws Exception
    {
-      store.storeCursorAcknowledgeTransactional(tx, cursorId, position);
+      store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+      installTXCallback(tx, position);
+      // tx.afterCommit()
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public void returnElement(final PagePosition position)
+   public void redeliver(final PagePosition position)
    {
       // TODO Auto-generated method stub
 
    }
+   
 
+   /** 
+    * Theres no need to synchronize this method as it's only called from journal load on startup
+    */
+   public void reloadACK(final PagePosition position)
+   {
+      internalAdd(position);
+
+   }
+
    /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPosition()
+    * @see org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public PagePosition getFirstPosition()
+   public void reloadPreparedACK(final Transaction tx, final PagePosition position)
    {
-      // TODO Auto-generated method stub
-      return null;
+      internalAdd(position);
+      installTXCallback(tx, position);
    }
 
+   public void processReload()
+   {
+      if (this.recoveredACK != null)
+      {
+         Collections.sort(recoveredACK);
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
    protected boolean match(final ServerMessage message)
    {
+      // To be used with expressions
       return true;
    }
+   
+   
 
    // Private -------------------------------------------------------
+   
+   /**
+    * @param committedACK
+    */
+   private void internalAdd(final PagePosition committedACK)
+   {
+      if (recoveredACK == null)
+      {
+         recoveredACK = new LinkedList<PagePosition>();
+      }
+      
+      recoveredACK.add(committedACK);
+   }
 
+
    private PagePosition recoverLastPosition()
    {
       long firstPage = pageStore.getFirstPage();
       return new PagePositionImpl(firstPage, -1);
    }
+   
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#recoverACK(org.hornetq.core.paging.cursor.PagePosition)
+   /**
+    * @param tx
+    * @param position
     */
-   public void recoverACK(final PagePosition position)
+   private void installTXCallback(Transaction tx, PagePosition position)
    {
-      // TODO Auto-generated method stub
+    }
 
-   }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#recoverPreparedACK(org.hornetq.core.paging.cursor.PagePosition)
-    */
-   public void recoverPreparedACK(final PagePosition position)
-   {
-      // TODO Auto-generated method stub
-
-   }
-
    // Inner classes -------------------------------------------------
 
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -94,6 +94,8 @@
     */
    public Pair<PagePosition, ServerMessage> getAfter(final PagePosition pos) throws Exception
    {
+      // TODO: consider page transactions here to avoid receiving an uncommitted message
+      // TODO: consider the case where a page came empty because of an ignored PageTX
       PagePosition retPos = pos.nextMessage();
       
       PageCache cache = getPageCache(pos.getPageNr());
@@ -179,6 +181,14 @@
       return softCache.size();
    }
 
+   public void processReload()
+   {
+      for (PageCursor cursor : this.activeCursors.values())
+      {
+         cursor.processReload();
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -228,7 +228,16 @@
          }
       }
    }
+   
+   public void processReload()
+   {
+      for (PagingStore store: stores.values())
+      {
+         store.processReload();
+      }
+   }
 
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -355,7 +355,14 @@
          currentPageLock.readLock().unlock();
       }
    }
+   
 
+   public void processReload()
+   {
+      cursorProvider.processReload();
+   }
+
+
    // HornetQComponent implementation
 
    public synchronized boolean isStarted()

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -1019,11 +1019,11 @@
                   SimpleString address = queueInfo.getAddress();
                   PagingStore store = pagingManager.getPageStore(address);
                   PageCursor cursor = store.getCursorProvier().getCursor(encoding.queueID);
-                  cursor.recoverACK(encoding.position);
+                  cursor.reloadACK(encoding.position);
                }
                else
                {
-                  log.warn("Can't find queue " + queueInfo.getId() + " while reloading ACKNOWLEDGE_CURSOR");
+                  log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
                }
                
                break;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -1114,6 +1114,8 @@
          deploymentManager.start();
       }
 
+      pagingManager.reloadStores();
+      
       pagingManager.resumeDepages();
 
       final ServerInfo dumper = new ServerInfo(this, pagingManager);

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -1027,6 +1027,15 @@
          return null;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.paging.PagingManager#processReload()
+       */
+      public void processReload()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeStorageManager implements StorageManager

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2010-10-05 17:26:00 UTC (rev 9752)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2010-10-05 17:33:33 UTC (rev 9753)
@@ -323,6 +323,16 @@
          return null;
       }
 
+      
+      
+      
+      /* (non-Javadoc)
+       * @see org.hornetq.core.paging.PagingManager#processReload()
+       */
+      public void processReload()
+      {
+      }
+
    }
 
 }



More information about the hornetq-commits mailing list