[hornetq-commits] JBoss hornetq SVN: r9779 - in branches/Branch_New_Paging: src/main/org/hornetq/core/persistence and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 12 14:17:08 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-10-12 14:17:07 -0400 (Tue, 12 Oct 2010)
New Revision: 9779

Added:
   branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Implementing cleanup after a full page was consumed

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorImpl.java	2010-10-12 18:17:07 UTC (rev 9779)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.paging.cursor.impl;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -26,6 +27,7 @@
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursor;
@@ -35,7 +37,9 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.impl.TransactionImpl;
 
 /**
  * A PageCursorImpl
@@ -48,6 +52,7 @@
 public class PageCursorImpl implements PageCursor
 {
    // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PageCursorImpl.class);
 
    // Attributes ----------------------------------------------------
 
@@ -58,14 +63,14 @@
    private final PagingStore pageStore;
 
    private final PageCursorProvider cursorProvider;
-   
+
    private final Executor executor;
 
    private volatile PagePosition lastPosition;
 
    private List<PagePosition> recoveredACK;
 
-   private SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
+   private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
 
    // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
    private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -112,7 +117,7 @@
       boolean match = false;
 
       Pair<PagePosition, ServerMessage> message = null;
-      
+
       do
       {
          message = cursorProvider.getAfter(lastPosition);
@@ -128,7 +133,7 @@
                processACK(message.a);
             }
          }
-         
+
       }
       while (message != null && !match);
 
@@ -140,14 +145,20 @@
     */
    public void ack(final PagePosition position) throws Exception
    {
-      store.storeCursorAcknowledge(cursorId, position);
+
+      // if we are dealing with a persistent cursor
+      if (cursorId != 0)
+      {
+         store.storeCursorAcknowledge(cursorId, position);
+      }
+
       store.afterCompleteOperations(new IOAsyncTask()
       {
-         
-         public void onError(int errorCode, String errorMessage)
+
+         public void onError(final int errorCode, final String errorMessage)
          {
          }
-         
+
          public void done()
          {
             processACK(position);
@@ -157,7 +168,11 @@
 
    public void ackTx(final Transaction tx, final PagePosition position) throws Exception
    {
-      store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+      // if the cursor is persistent
+      if (cursorId != 0)
+      {
+         store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+      }
       installTXCallback(tx, position);
 
    }
@@ -167,7 +182,7 @@
     */
    public synchronized void redeliver(final PagePosition position)
    {
-      this.redeliveries.add(position);
+      redeliveries.add(position);
    }
 
    /** 
@@ -175,8 +190,12 @@
     */
    public void reloadACK(final PagePosition position)
    {
-      internalAdd(position);
+      if (recoveredACK == null)
+      {
+         recoveredACK = new LinkedList<PagePosition>();
+      }
 
+      recoveredACK.add(position);
    }
 
    /* (non-Javadoc)
@@ -190,7 +209,7 @@
 
    public void processReload() throws Exception
    {
-      if (this.recoveredACK != null)
+      if (recoveredACK != null)
       {
          System.out.println("********** processing reload!!!!!!!");
          Collections.sort(recoveredACK);
@@ -199,7 +218,7 @@
          for (PagePosition pos : recoveredACK)
          {
             PageCursorInfo positions = getPageInfo(pos);
-            
+
             positions.addACK(pos);
 
             lastPosition = pos;
@@ -230,7 +249,8 @@
                         else
                         {
                            // The reference was ignored. But we must take a count from the reference count
-                           // otherwise the page will never be deleted hence we would never leave paging even if everything was consumed
+                           // otherwise the page will never be deleted hence we would never leave paging even if
+                           // everything was consumed
                            positions.confirmed.incrementAndGet();
                         }
                      }
@@ -252,25 +272,23 @@
     * @param page
     * @return
     */
-   private PageCursorInfo getPageInfo(PagePosition pos)
+   private PageCursorInfo getPageInfo(final PagePosition pos)
    {
       PageCursorInfo pageInfo = consumedPages.get(pos.getPageNr());
-      
+
       if (pageInfo == null)
       {
          PageCache cache = cursorProvider.getPageCache(pos);
          pageInfo = new PageCursorInfo(pos.getPageNr(), cache.getNumberOfMessages());
          consumedPages.put(pos.getPageNr(), pageInfo);
       }
-      
+
       return pageInfo;
    }
 
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
-   
 
    protected boolean match(final ServerMessage message)
    {
@@ -279,88 +297,149 @@
    }
 
    // Private -------------------------------------------------------
-    
+
    // To be called only after the ACK has been processed and guaranteed to be on storae
    // The only exception is on non storage events such as not matching messages
    private void processACK(final PagePosition pos)
    {
       PageCursorInfo info = getPageInfo(pos);
-      
+
       info.addACK(pos);
    }
 
    /**
-    * @param committedACK
+    * @param tx
+    * @param position
     */
-   private void internalAdd(final PagePosition committedACK)
+   private void installTXCallback(final Transaction tx, final PagePosition position)
    {
-      if (recoveredACK == null)
+      if (position.getRecordID() > 0)
       {
-         recoveredACK = new LinkedList<PagePosition>();
+         // It needs to persist, otherwise the cursor will return to the fist page position
+         tx.setContainsPersistent();
       }
 
-      recoveredACK.add(committedACK);
-   }
+      PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
 
-   /**
-    * @param tx
-    * @param position
-    */
-   private void installTXCallback(Transaction tx, PagePosition position)
-   {
-      // It needs to persist, otherwise the cursor will return to the fist page position
-      tx.setContainsPersistent();
-      
-      PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
-      
       if (cursorTX == null)
       {
          cursorTX = new PageCursorTX();
-         tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS,cursorTX);
+         tx.putProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS, cursorTX);
          tx.addOperation(cursorTX);
       }
-      
+
       cursorTX.addPositionConfirmation(this, position);
-      
-      
+
    }
-   
-   // A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
-   private void onPageDone(PageCursorInfo info)
+
+   /**
+    *  A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
+    * @param info
+    */
+   private void onPageDone(final PageCursorInfo info)
    {
       System.out.println("Page " + info.getPageId() + " has completed");
+
+      executor.execute(new Runnable()
+      {
+
+         public void run()
+         {
+            try
+            {
+               cleanupPages();
+            }
+            catch (Exception e)
+            {
+               PageCursorImpl.log.warn("Error on cleaning up cursor pages");
+            }
+         }
+      });
    }
 
+   /** 
+    * It will cleanup all the records for completed pages
+    * */
+   private void cleanupPages() throws Exception
+   {
+      Transaction tx = new TransactionImpl(store);
+
+      boolean persist = false;
+
+      final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
+
+      // First get the completed pages
+      synchronized (this)
+      {
+         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+         {
+            if (entry.getValue().isDone())
+            {
+               completedPages.add(entry.getValue());
+            }
+         }
+      }
+
+      for (PageCursorInfo info : completedPages)
+      {
+         for (PagePosition pos : info.acks)
+         {
+            if (pos.getRecordID() > 0)
+            {
+               store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+               if (!persist)
+               {
+                  // only need to set it once
+                  tx.setContainsPersistent();
+                  persist = true;
+               }
+            }
+         }
+      }
+
+      tx.addOperation(new TransactionOperationAbstract()
+      {
+
+         @Override
+         public void afterCommit(final Transaction tx)
+         {
+            synchronized (PageCursorImpl.this)
+            {
+               for (PageCursorInfo completePage : completedPages)
+               {
+                  System.out.println("Removing page " + completePage.getPageId());
+                  consumedPages.remove(completePage.getPageId());
+               }
+            }
+         }
+      });
+
+      tx.commit();
+
+   }
+
    // Inner classes -------------------------------------------------
-   
-   
+
    private class PageCursorInfo
    {
       // Number of messages existent on this page
       private final int numberOfMessages;
-      
+
       private final long pageId;
-      
+
       // Confirmed ACKs on this page
       private final List<PagePosition> acks = Collections.synchronizedList(new LinkedList<PagePosition>());
-      
-      // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or expressions 
+
+      // We need a separate counter as the cursor may be ignoring certain values because of incomplete transactions or
+      // expressions
       private final AtomicInteger confirmed = new AtomicInteger(0);
-      
+
       public PageCursorInfo(final long pageId, final int numberOfMessages)
       {
          this.pageId = pageId;
          this.numberOfMessages = numberOfMessages;
       }
 
-      /**
-       * @return the numberOfMessages
-       */
-      public int getNumberOfMessages()
-      {
-         return numberOfMessages;
-      }
-      
       public boolean isDone()
       {
          return numberOfMessages == confirmed.get();
@@ -373,91 +452,91 @@
       {
          return pageId;
       }
-      
+
       public void addACK(final PagePosition posACK)
       {
          if (posACK.getRecordID() > 0)
          {
             // We store these elements for later cleanup
-            this.acks.add(posACK);
+            acks.add(posACK);
          }
-         
+
          if (numberOfMessages == confirmed.incrementAndGet())
          {
-            PageCursorImpl.this.onPageDone(this);
+            onPageDone(this);
          }
       }
-      
-    }
-   
+
+   }
+
    static class PageCursorTX implements TransactionOperation
    {
       HashMap<PageCursorImpl, List<PagePosition>> pendingPositions = new HashMap<PageCursorImpl, List<PagePosition>>();
-      
-      public void addPositionConfirmation(PageCursorImpl cursor, PagePosition position)
+
+      public void addPositionConfirmation(final PageCursorImpl cursor, final PagePosition position)
       {
          List<PagePosition> list = pendingPositions.get(cursor);
-         
+
          if (list == null)
          {
             list = new LinkedList<PagePosition>();
             pendingPositions.put(cursor, list);
          }
-         
+
          list.add(position);
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
        */
-      public void beforePrepare(Transaction tx) throws Exception
+      public void beforePrepare(final Transaction tx) throws Exception
       {
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
        */
-      public void afterPrepare(Transaction tx)
+      public void afterPrepare(final Transaction tx)
       {
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
        */
-      public void beforeCommit(Transaction tx) throws Exception
+      public void beforeCommit(final Transaction tx) throws Exception
       {
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
        */
-      public void afterCommit(Transaction tx)
+      public void afterCommit(final Transaction tx)
       {
-         for (Entry<PageCursorImpl, List<PagePosition>> entry : this.pendingPositions.entrySet())
+         for (Entry<PageCursorImpl, List<PagePosition>> entry : pendingPositions.entrySet())
          {
             PageCursorImpl cursor = entry.getKey();
-            
+
             List<PagePosition> positions = entry.getValue();
-            
+
             for (PagePosition confirmed : positions)
             {
                cursor.processACK(confirmed);
             }
-            
+
          }
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
        */
-      public void beforeRollback(Transaction tx) throws Exception
+      public void beforeRollback(final Transaction tx) throws Exception
       {
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
        */
-      public void afterRollback(Transaction tx)
+      public void afterRollback(final Transaction tx)
       {
       }
    }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java	2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java	2010-10-12 18:17:07 UTC (rev 9779)
@@ -116,6 +116,8 @@
    void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception;
 
    void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception;
+   
+   void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception;
 
    void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-10-12 18:17:07 UTC (rev 9779)
@@ -631,7 +631,17 @@
       position.setRecordID(ackID);
       messageJournal.appendAddRecordTransactional(txID, ackID, ACKNOWLEDGE_CURSOR, new CursorAckRecordEncoding(queueID, position));
    }
+   
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
+    */
+   public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
+   {
+      messageJournal.appendDeleteRecordTransactional(txID, ackID);
+   }
+
+
    public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
    {
       long id = generateUniqueID();

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-10-12 18:17:07 UTC (rev 9779)
@@ -470,4 +470,13 @@
       
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
+    */
+   public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }

Added: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java	2010-10-12 18:17:07 UTC (rev 9779)
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.transaction;
+
+/**
+ * Just a helper, when you don't want to implement all the methods on a transaction operation.
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public abstract class TransactionOperationAbstract implements TransactionOperation
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   public void beforePrepare(Transaction tx) throws Exception
+   {
+
+   }
+
+   /** After prepare shouldn't throw any exception. Any verification has to be done on before prepare */
+   public void afterPrepare(Transaction tx)
+   {
+
+   }
+
+   public void beforeCommit(Transaction tx) throws Exception
+   {
+   }
+
+   /** After commit shouldn't throw any exception. Any verification has to be done on before commit */
+   public void afterCommit(Transaction tx)
+   {
+   };
+
+   public void beforeRollback(Transaction tx) throws Exception
+   {
+   };
+
+   /** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
+   public void afterRollback(Transaction tx)
+   {
+   };
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-12 14:52:13 UTC (rev 9778)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-10-12 18:17:07 UTC (rev 9779)
@@ -1543,6 +1543,15 @@
          
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
+       */
+      public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list