[hornetq-commits] JBoss hornetq SVN: r9870 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 10 21:09:13 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-10 21:09:12 -0500 (Wed, 10 Nov 2010)
New Revision: 9870

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
update page transactions

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -42,12 +42,16 @@
    
    void store(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
    
-   void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx, int depages) throws Exception;
+   void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
+   
+   void storeUpdate(StorageManager storageManager, PagingManager pagingManager) throws Exception;
 
    // To be used after the update was stored or reload
    void onUpdate(int update, StorageManager storageManager, PagingManager pagingManager);
 
    void increment();
+   
+   void increment(int size);
 
    int getNumberOfMessages();
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -31,6 +31,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -164,133 +165,115 @@
       ack(position);
    }
 
-   class CursorIterator implements LinkedListIterator<PagedReference>
+   public void scheduleCleanupCheck()
    {
-      private PagePosition position = null;
-
-      private PagePosition lastOperation = null;
-
-      private final LinkedListIterator<PagePosition> redeliveryIterator;
-
-      private volatile boolean isredelivery = false;
-      
-      /** next element taken on hasNext test.
-       *  it has to be delivered on next next operation */
-      private volatile PagedReference cachedNext;
-      
-      public CursorIterator()
+      if (autoCleanup)
       {
-         synchronized (redeliveries)
+         executor.execute(new Runnable()
          {
-            redeliveryIterator = redeliveries.iterator();
-         }
-      }
-      
 
-      public void repeat()
-      {
-         if (isredelivery)
-         {
-            synchronized (redeliveries)
+            public void run()
             {
-               redeliveryIterator.repeat();
+               try
+               {
+                  cleanupEntries();
+               }
+               catch (Exception e)
+               {
+                  PageSubscriptionImpl.log.warn("Error on cleaning up cursor pages", e);
+               }
             }
-         }
-         else
-         {
-            if (lastOperation == null)
-            {
-               position = null;
-            }
-            else
-            {
-               position = lastOperation;
-            }
-         }
+         });
       }
+   }
 
-      /* (non-Javadoc)
-       * @see java.util.Iterator#next()
-       */
-      public synchronized PagedReference next()
+   /** 
+    * It will cleanup all the records for completed pages
+    * */
+   public void cleanupEntries() throws Exception
+   {
+      Transaction tx = new TransactionImpl(store);
+
+      boolean persist = false;
+
+      final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
+
+      // First get the completed pages using a lock
+      synchronized (this)
       {
-         
-         if (cachedNext != null)
+         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
          {
-            PagedReference retPos = cachedNext;
-            cachedNext = null;
-            return retPos;
-         }
-         
-         try
-         {
-            synchronized (redeliveries)
+            PageCursorInfo info = entry.getValue();
+            if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
             {
-               if (redeliveryIterator.hasNext())
+               if (entry.getKey() == lastAckedPosition.getPageNr())
                {
-                  // There's a redelivery pending, we will get it out of that pool instead
-                  isredelivery = true;
-                  return getReference(redeliveryIterator.next());
+                  PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
                }
                else
                {
-                  isredelivery = false;
+                  info.setPendingDelete();
+                  completedPages.add(entry.getValue());
                }
             }
-            
-            if (position == null)
-            {
-               position = getStartPosition();
-            }
+         }
+      }
 
-            PagedReference nextPos = moveNext(position);
-            if (nextPos != null)
+      for (int i = 0; i < completedPages.size(); i++)
+      {
+         PageCursorInfo info = completedPages.get(i);
+
+         for (PagePosition pos : info.acks)
+         {
+            if (pos.getRecordID() > 0)
             {
-               lastOperation = position;
-               position = nextPos.getPosition();
+               store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
+               if (!persist)
+               {
+                  // only need to set it once
+                  tx.setContainsPersistent();
+                  persist = true;
+               }
             }
-            return nextPos;
          }
-         catch (Exception e)
-         {
-            throw new RuntimeException(e.getMessage(), e);
-         }
       }
 
-      /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. 
-       *  It would be a rare race condition but I would prefer avoiding that scenario */
-      public synchronized boolean hasNext()
+      tx.addOperation(new TransactionOperationAbstract()
       {
-         // if an unbehaved program called hasNext twice before next, we only cache it once.
-         if (cachedNext != null)
+
+         @Override
+         public void afterCommit(final Transaction tx)
          {
-            return true;
+            executor.execute(new Runnable()
+            {
+
+               public void run()
+               {
+                  synchronized (PageSubscriptionImpl.this)
+                  {
+                     for (PageCursorInfo completePage : completedPages)
+                     {
+                        if (isTrace)
+                        {
+                           PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
+                        }
+                        if (consumedPages.remove(completePage.getPageId()) == null)
+                        {
+                           PageSubscriptionImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
+                                                   " from consumed pages on cursor for address " +
+                                                   pageStore.getAddress());
+                        }
+                     }
+                  }
+
+                  cursorProvider.scheduleCleanup();
+               }
+            });
          }
-         
-         if (!pageStore.isPaging())
-         {
-            return false;
-         }
-         
-         cachedNext = next();
+      });
 
-         return cachedNext != null;
-      }
+      tx.commit();
 
-      /* (non-Javadoc)
-       * @see java.util.Iterator#remove()
-       */
-      public void remove()
-      {
-         PageSubscriptionImpl.this.getPageInfo(position).remove(position);
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.utils.LinkedListIterator#close()
-       */
-      public void close()
-      {
-      }
    }
 
    private PagedReference getReference(PagePosition pos) throws Exception
@@ -396,12 +379,41 @@
       return new PagePositionImpl(pageStore.getFirstPage(), -1);
    }
 
+   public void ackTx(final Transaction tx, final PagePosition position) throws Exception
+   {
+      // if the cursor is persistent
+      if (persistent)
+      {
+         store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
+      }
+      installTXCallback(tx, position);
+
+   }
+
+
+   public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
+   {
+      ackTx(tx, reference.getPosition());
+      
+      PageTransactionInfo txInfo = getPageTransaction(reference);
+      if (txInfo != null)
+      {
+         txInfo.storeUpdate(store, pageStore.getPagingManager(), tx);
+      }
+   }
+
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#confirm(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public void ack(final PagedReference position) throws Exception
+   public void ack(final PagedReference reference) throws Exception
    {
-      ack(position.getPosition());
+      ack(reference.getPosition());
+      PageTransactionInfo txInfo = getPageTransaction(reference);
+      if (txInfo != null)
+      {
+         txInfo.storeUpdate(this.store, pageStore.getPagingManager());
+      }
    }
    
    public void ack(final PagePosition position) throws Exception
@@ -426,23 +438,6 @@
       });
    }
 
-   public void ackTx(final Transaction tx, final PagePosition position) throws Exception
-   {
-      // if the cursor is persistent
-      if (persistent)
-      {
-         store.storeCursorAcknowledgeTransactional(tx.getID(), cursorId, position);
-      }
-      installTXCallback(tx, position);
-
-   }
-
-
-   public void ackTx(final Transaction tx, final PagedReference position) throws Exception
-   {
-      ackTx(tx, position.getPosition());
-   }
-
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#getFirstPage()
     */
@@ -743,6 +738,18 @@
       cursorTX.addPositionConfirmation(this, position);
 
    }
+   
+   private PageTransactionInfo getPageTransaction(final PagedReference reference)
+   {
+      if (reference.getPagedMessage().getTransactionID() != 0)
+      {
+         return pageStore.getPagingManager().getTransaction(reference.getPagedMessage().getTransactionID());
+      }
+      else
+      {
+         return null;
+      }
+   }
 
    /**
     *  A callback from the PageCursorInfo. It will be called when all the messages on a page have been acked
@@ -755,118 +762,6 @@
          scheduleCleanupCheck();
       }
    }
-
-   public void scheduleCleanupCheck()
-   {
-      if (autoCleanup)
-      {
-         executor.execute(new Runnable()
-         {
-
-            public void run()
-            {
-               try
-               {
-                  cleanupEntries();
-               }
-               catch (Exception e)
-               {
-                  PageSubscriptionImpl.log.warn("Error on cleaning up cursor pages", e);
-               }
-            }
-         });
-      }
-   }
-
-   /** 
-    * It will cleanup all the records for completed pages
-    * */
-   public void cleanupEntries() throws Exception
-   {
-      Transaction tx = new TransactionImpl(store);
-
-      boolean persist = false;
-
-      final ArrayList<PageCursorInfo> completedPages = new ArrayList<PageCursorInfo>();
-
-      // First get the completed pages using a lock
-      synchronized (this)
-      {
-         for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
-         {
-            PageCursorInfo info = entry.getValue();
-            if (info.isDone() && !info.isPendingDelete() && lastAckedPosition != null)
-            {
-               if (entry.getKey() == lastAckedPosition.getPageNr())
-               {
-                  PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() + " now since it's the current page");
-               }
-               else
-               {
-                  info.setPendingDelete();
-                  completedPages.add(entry.getValue());
-               }
-            }
-         }
-      }
-
-      for (int i = 0; i < completedPages.size(); i++)
-      {
-         PageCursorInfo info = completedPages.get(i);
-
-         for (PagePosition pos : info.acks)
-         {
-            if (pos.getRecordID() > 0)
-            {
-               store.deleteCursorAcknowledgeTransactional(tx.getID(), pos.getRecordID());
-               if (!persist)
-               {
-                  // only need to set it once
-                  tx.setContainsPersistent();
-                  persist = true;
-               }
-            }
-         }
-      }
-
-      tx.addOperation(new TransactionOperationAbstract()
-      {
-
-         @Override
-         public void afterCommit(final Transaction tx)
-         {
-            executor.execute(new Runnable()
-            {
-
-               public void run()
-               {
-                  synchronized (PageSubscriptionImpl.this)
-                  {
-                     for (PageCursorInfo completePage : completedPages)
-                     {
-                        if (isTrace)
-                        {
-                           PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
-                        }
-                        if (consumedPages.remove(completePage.getPageId()) == null)
-                        {
-                           PageSubscriptionImpl.log.warn("Couldn't remove page " + completePage.getPageId() +
-                                                   " from consumed pages on cursor for address " +
-                                                   pageStore.getAddress());
-                        }
-                     }
-                  }
-
-                  cursorProvider.scheduleCleanup();
-               }
-            });
-         }
-      });
-
-      tx.commit();
-
-   }
-
    // Inner classes -------------------------------------------------
 
    /** 
@@ -1038,6 +933,137 @@
 
          }
       }
+   }
+   
 
+   class CursorIterator implements LinkedListIterator<PagedReference>
+   {
+      private PagePosition position = null;
+
+      private PagePosition lastOperation = null;
+
+      private final LinkedListIterator<PagePosition> redeliveryIterator;
+
+      private volatile boolean isredelivery = false;
+      
+      /** next element taken on hasNext test.
+       *  it has to be delivered on next next operation */
+      private volatile PagedReference cachedNext;
+      
+      public CursorIterator()
+      {
+         synchronized (redeliveries)
+         {
+            redeliveryIterator = redeliveries.iterator();
+         }
+      }
+      
+
+      public void repeat()
+      {
+         if (isredelivery)
+         {
+            synchronized (redeliveries)
+            {
+               redeliveryIterator.repeat();
+            }
+         }
+         else
+         {
+            if (lastOperation == null)
+            {
+               position = null;
+            }
+            else
+            {
+               position = lastOperation;
+            }
+         }
+      }
+
+      /* (non-Javadoc)
+       * @see java.util.Iterator#next()
+       */
+      public synchronized PagedReference next()
+      {
+         
+         if (cachedNext != null)
+         {
+            PagedReference retPos = cachedNext;
+            cachedNext = null;
+            return retPos;
+         }
+         
+         try
+         {
+            synchronized (redeliveries)
+            {
+               if (redeliveryIterator.hasNext())
+               {
+                  // There's a redelivery pending, we will get it out of that pool instead
+                  isredelivery = true;
+                  return getReference(redeliveryIterator.next());
+               }
+               else
+               {
+                  isredelivery = false;
+               }
+            }
+            
+            if (position == null)
+            {
+               position = getStartPosition();
+            }
+
+            PagedReference nextPos = moveNext(position);
+            if (nextPos != null)
+            {
+               lastOperation = position;
+               position = nextPos.getPosition();
+            }
+            return nextPos;
+         }
+         catch (Exception e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+
+      /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. 
+       *  It would be a rare race condition but I would prefer avoiding that scenario */
+      public synchronized boolean hasNext()
+      {
+         // if an unbehaved program called hasNext twice before next, we only cache it once.
+         if (cachedNext != null)
+         {
+            return true;
+         }
+         
+         if (!pageStore.isPaging())
+         {
+            return false;
+         }
+         
+         cachedNext = next();
+
+         return cachedNext != null;
+      }
+
+      /* (non-Javadoc)
+       * @see java.util.Iterator#remove()
+       */
+      public void remove()
+      {
+         PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.utils.LinkedListIterator#close()
+       */
+      public void close()
+      {
+      }
    }
+
+   
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -13,12 +13,15 @@
 
 package org.hornetq.core.paging.impl;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.Pair;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagingManager;
@@ -27,6 +30,8 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
+import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.utils.DataConstants;
 
 /**
@@ -98,10 +103,7 @@
          {
             log.warn("Can't delete page transaction id=" + this.recordID);
          }
-      }
-      
-      if (sizeAfterUpdate == 0 && pagingManager != null)
-      {
+
          pagingManager.removeTransaction(this.transactionID);
       }
    }
@@ -110,6 +112,11 @@
    {
       numberOfMessages.incrementAndGet();
    }
+   
+   public void increment(final int size)
+   {
+      numberOfMessages.addAndGet(size);
+   }
 
    public int getNumberOfMessages()
    {
@@ -158,40 +165,37 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
     */
-   public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int depages) throws Exception
+   public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception
    {
-      storageManager.updatePageTransaction(tx.getID(), this, depages);
+      UpdatePageTXOperation pgtxUpdate = (UpdatePageTXOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE);
       
-      final PageTransactionInfo pgToUpdate = this;
+      if (pgtxUpdate == null)
+      {
+         pgtxUpdate = new UpdatePageTXOperation(storageManager, pagingManager);
+         tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE, pgtxUpdate);
+         tx.addOperation(pgtxUpdate);
+      }
       
-      tx.addOperation(new TransactionOperation()
+      pgtxUpdate.addUpdate(this);
+   }
+   
+   public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager) throws Exception
+   {
+      storageManager.updatePageTransaction(this, 1);
+      storageManager.afterCompleteOperations(new IOAsyncTask()
       {
-         public void beforeRollback(Transaction tx) throws Exception
+         public void onError(int errorCode, String errorMessage)
          {
          }
          
-         public void beforePrepare(Transaction tx) throws Exception
+         public void done()
          {
+            PageTransactionInfoImpl.this.onUpdate(1, storageManager, pagingManager);
          }
-         
-         public void beforeCommit(Transaction tx) throws Exception
-         {
-         }
-         
-         public void afterRollback(Transaction tx)
-         {
-         }
-         
-         public void afterPrepare(Transaction tx)
-         {
-         }
-         
-         public void afterCommit(Transaction tx)
-         {
-            pgToUpdate.onUpdate(depages, storageManager, pagingManager);
-         }
       });
    }
+   
+   
 
    public boolean isCommit()
    {
@@ -260,4 +264,68 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+   
+   
+   static class UpdatePageTXOperation extends TransactionOperationAbstract
+   {
+      private HashMap<PageTransactionInfo, AtomicInteger> countsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
+      
+      private boolean stored = false;
+      
+      private final StorageManager storageManager;
+      
+      private final PagingManager pagingManager;
+      
+      public UpdatePageTXOperation(final StorageManager storageManager, final PagingManager pagingManager)
+      {
+         this.storageManager = storageManager;
+         this.pagingManager = pagingManager;
+      }
+      
+      public void addUpdate(PageTransactionInfo info)
+      {
+         AtomicInteger counter = countsToUpdate.get(info);
+         
+         if (counter == null)
+         {
+            counter = new AtomicInteger(0);
+            countsToUpdate.put(info, counter);
+         }
+         
+         counter.incrementAndGet();
+      }
+      
+      public void beforePrepare(Transaction tx) throws Exception
+      {
+         storeUpdates(tx);
+      }
+      
+      public void beforeCommit(Transaction tx) throws Exception
+      {
+         storeUpdates(tx);
+      }
+      
+      public void afterCommit(Transaction tx)
+      {
+         for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : countsToUpdate.entrySet())
+         {
+            entry.getKey().onUpdate(entry.getValue().intValue(), storageManager, pagingManager);
+         }
+      }
+      
+      private void storeUpdates(Transaction tx) throws Exception
+      {
+         if (!stored)
+         {
+            stored = true;
+            for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : countsToUpdate.entrySet())
+            {
+               storageManager.updatePageTransaction(tx.getID(), entry.getKey(), entry.getValue().get());
+            }
+         }
+      }
+      
+
+      
+   }
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -14,7 +14,6 @@
 package org.hornetq.core.paging.impl;
 
 import java.text.DecimalFormat;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,8 +50,8 @@
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.ExecutorFactory;
@@ -700,59 +699,6 @@
     * @return
     * @throws Exception
     */
-   protected boolean readPage() throws Exception
-   {
-      Page page = depage();
-
-      // It's important that only depage should happen while locked
-      // or we would be holding a lock for a long time
-      // The reading (IO part) should happen outside of any locks
-
-      if (page == null)
-      {
-         return false;
-      }
-
-      page.open();
-
-      List<PagedMessage> messages = null;
-
-      try
-      {
-         messages = page.read();
-      }
-      finally
-      {
-         try
-         {
-            page.close();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
-      if (onDepage(page.getPageId(), storeName, messages))
-      {
-         if (page.delete())
-         {
-            // DuplicateCache could be null during replication
-            // however the deletes on the journal will happen through replicated journal
-            if (duplicateCache != null)
-            {
-               duplicateCache.deleteFromCache(generateDuplicateID(page.getPageId()));
-            }
-         }
-
-         return true;
-      }
-      else
-      {
-         return false;
-      }
-
-   }
-
    private Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
 
    private class MemoryFreedRunnablesExecutor implements Runnable
@@ -951,7 +897,7 @@
          
          Transaction tx = ctx.getTransaction();
 
-         pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(tx));
+         pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(tx, listCtx));
 
          int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
@@ -991,7 +937,7 @@
       return ids;
    }
    
-   private long getTransactionID(Transaction tx) throws Exception
+   private long getTransactionID(final Transaction tx, final RouteContextList listCtx) throws Exception
    {
       if (tx == null)
       {
@@ -999,34 +945,42 @@
       }
       else
       {
-         if (tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION) == null)
+         PageTransactionInfo pgTX = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+         if (pgTX == null)
          {
-            PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+            pgTX = new PageTransactionInfoImpl(tx.getID());
             System.out.println("Creating pageTransaction " + pgTX.getTransactionID());
-            storageManager.storePageTransaction(tx.getID(), pgTX);
             pagingManager.addTransaction(pgTX);
             tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
-            tx.addOperation(new FinishPageMessageOperation());
+            tx.addOperation(new FinishPageMessageOperation(pgTX));
             
             tx.setContainsPersistent();
          }
          
+         pgTX.increment(listCtx.getNumberOfQueues());
+         
          return tx.getID();
       }
    }
 
    
-   private static class FinishPageMessageOperation implements TransactionOperation
+   private class FinishPageMessageOperation implements TransactionOperation
    {
+      private final PageTransactionInfo pageTransaction;
+      
+      private boolean stored = false;
 
+      public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
+      {
+         this.pageTransaction = pageTransaction;
+      }
+      
       public void afterCommit(final Transaction tx)
       {
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
          // transaction until all the messages were added to the queue
          // or else we could deliver the messages out of order
 
-         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
          if (pageTransaction != null)
          {
             pageTransaction.commit();
@@ -1039,8 +993,6 @@
 
       public void afterRollback(final Transaction tx)
       {
-         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
          if (tx.getState() == State.PREPARED && pageTransaction != null)
          {
             pageTransaction.rollback();
@@ -1049,11 +1001,22 @@
 
       public void beforeCommit(final Transaction tx) throws Exception
       {
+         storePageTX(tx);
       }
 
       public void beforePrepare(final Transaction tx) throws Exception
       {
+         storePageTX(tx);
       }
+      
+      private void storePageTX(final Transaction tx) throws Exception
+      {
+         if (!stored)
+         {
+            storageManager.storePageTransaction(tx.getID(), pageTransaction);
+            stored = true;
+         }
+      }
 
       public void beforeRollback(final Transaction tx) throws Exception
       {
@@ -1067,157 +1030,7 @@
     * If persistent messages are also used, it will update eventual PageTransactions
     */
 
-   private boolean onDepage(final int pageId, final SimpleString address, final List<PagedMessage> pagedMessages) throws Exception
-   {
-      if (PagingStoreImpl.isTrace)
-      {
-         PagingStoreImpl.trace("Depaging....");
-      }
-
-      if (pagedMessages.size() == 0)
-      {
-         // nothing to be done on this case.
-         return true;
-      }
-
-      // Depage has to be done atomically, in case of failure it should be
-      // back to where it was
-
-      byte[] duplicateIdForPage = generateDuplicateID(pageId);
-
-      Transaction depageTransaction = new TransactionImpl(storageManager);
-
-      // DuplicateCache could be null during replication
-      if (duplicateCache != null)
-      {
-         if (duplicateCache.contains(duplicateIdForPage))
-         {
-            log.warn("Page " + pageId +
-                     " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
-            return true;
-         }
-
-         duplicateCache.addToCache(duplicateIdForPage, depageTransaction);
-      }
-
-      depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
-
-      HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
-
-      for (PagedMessage pagedMessage : pagedMessages)
-      {
-         ServerMessage message = pagedMessage.getMessage();
-
-         if (message.isLargeMessage())
-         {
-            LargeServerMessage largeMsg = (LargeServerMessage)message;
-            if (!largeMsg.isFileExists())
-            {
-               PagingStoreImpl.log.warn("File for large message " + largeMsg.getMessageID() +
-                                        " doesn't exist, so ignoring depage for this large message");
-               continue;
-            }
-         }
-
-         final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-
-         PageTransactionInfo pageUserTransaction = null;
-         AtomicInteger countPageTX = null;
-
-         if (transactionIdDuringPaging >= 0)
-         {
-            pageUserTransaction = pagingManager.getTransaction(transactionIdDuringPaging);
-
-            if (pageUserTransaction == null)
-            {
-               // This is not supposed to happen
-               PagingStoreImpl.log.warn("Transaction " + pagedMessage.getTransactionID() +
-                                        " used during paging not found");
-               continue;
-            }
-            else
-            {
-               countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
-               if (countPageTX == null)
-               {
-                  countPageTX = new AtomicInteger();
-                  pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
-               }
-
-               // This is to avoid a race condition where messages are depaged
-               // before the commit arrived
-
-               while (running)
-               {
-                  // This is just to give us a chance to interrupt the process..
-                  // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
-                  // the shutdown of the server
-                  if (PagingStoreImpl.isTrace)
-                  {
-                     PagingStoreImpl.trace("Waiting pageTransaction to complete");
-                  }
-               }
-
-               if (!running)
-               {
-                  break;
-               }
-
-               if (!pageUserTransaction.isCommit())
-               {
-                  if (PagingStoreImpl.isTrace)
-                  {
-                     PagingStoreImpl.trace("Rollback was called after prepare, ignoring message " + message);
-                  }
-                  continue;
-               }
-            }
-
-         }
-
-         postOffice.route(message, depageTransaction, false);
-
-         // This means the page is duplicated. So we need to ignore this
-         if (depageTransaction.getState() == State.ROLLBACK_ONLY)
-         {
-            break;
-         }
-
-         // Update information about transactions
-         // This needs to be done after routing because of duplication detection
-         if (pageUserTransaction != null && message.isDurable())
-         {
-            countPageTX.incrementAndGet();
-         }
-      }
-
-      if (!running)
-      {
-         depageTransaction.rollback();
-         return false;
-      }
-
-      for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : pageTransactionsToUpdate.entrySet())
-      {
-         // This will set the journal transaction to commit;
-         depageTransaction.setContainsPersistent();
-
-         entry.getKey().storeUpdate(storageManager, this.pagingManager, depageTransaction, entry.getValue().intValue());
-      }
-
-      depageTransaction.commit();
-
-      storageManager.waitOnOperations();
-
-      if (PagingStoreImpl.isTrace)
-      {
-         PagingStoreImpl.trace("Depage committed, running = " + running);
-      }
-
-      return true;
-   }
-
-   /**
+    /**
     * @param pageId
     * @return
     */

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/StorageManager.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -142,6 +142,8 @@
    void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
    
    void updatePageTransaction(long txID, PageTransactionInfo pageTransaction,  int depage) throws Exception;
+   
+   void updatePageTransaction(PageTransactionInfo pageTransaction,  int depage) throws Exception;
 
    void deletePageTransactional(long recordID) throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -606,6 +606,16 @@
                                                                               depages));
    }
 
+   public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
+   {
+      messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
+                                        JournalStorageManager.PAGE_TRANSACTION,
+                                        new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+                                                                 depages),
+                                        syncNonTransactional,
+                                        getContext(syncNonTransactional));
+   }
+
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
       messageJournal.appendUpdateRecordTransactional(txID,

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -479,4 +479,13 @@
       
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(org.hornetq.core.paging.PageTransactionInfo, int)
+    */
+   public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -24,6 +24,8 @@
  */
 public interface RouteContextList
 {
+   
+   int getNumberOfQueues();
 
    List<Queue> getDurableQueues();
    

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -1272,7 +1272,8 @@
       
       if (msgsToDeliver > 0)
       {
-         System.out.println("Depaging " + msgsToDeliver + " messages");
+         //System.out.println("Depaging " + msgsToDeliver + " messages");
+         System.out.println("Depage "  + msgsToDeliver + " now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
    
          int nmessages = 0;
          while (nmessages < msgsToDeliver && pageIterator.hasNext())
@@ -1281,7 +1282,13 @@
             addTail(pageIterator.next(), false);
             pageIterator.remove();
          }
+         
+         System.out.println("Depaged " + nmessages);
       }
+      else
+      {
+         System.out.println("Depaging not being done now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
+      }
       
       deliverAsync();
    }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -124,6 +124,11 @@
       private List<Queue> durableQueue = new ArrayList<Queue>(1);
       
       private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
+      
+      public int getNumberOfQueues()
+      {
+         return durableQueue.size() + nonDurableQueue.size();
+      }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.server.RouteContextList#getDurableQueues()

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -24,8 +24,9 @@
  */
 public class TransactionPropertyIndexes
 {
-   public static final int IS_DEPAGE = 3;
 
+   public static final int PAGE_TRANSACTION_UPDATE = 4;
+   
    public static final int PAGE_TRANSACTION = 5;
 
    public static final int REFS_OPERATION = 6;

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -1012,37 +1012,6 @@
                   syncNonTransactional);
          }
 
-         protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx,  org.hornetq.core.server.RouteContextList listCtx, boolean sync) throws Exception
-         {
-            boolean paged = super.page(message, ctx, listCtx, sync);
-
-            if (paged)
-            {
-
-               if (countDepage.incrementAndGet() == 1)
-               {
-                  countDepage.set(0);
-
-                  executor.execute(new Runnable()
-                  {
-                     public void run()
-                     {
-                        try
-                        {
-                           while (isStarted() && readPage());
-                        }
-                        catch (Exception e)
-                        {
-                           e.printStackTrace();
-                        }
-                     }
-                  });
-               }
-            }
-
-            return paged;
-         }
-
          public boolean startDepaging()
          {
             // do nothing, we are hacking depage right in between paging
@@ -1306,8 +1275,9 @@
 
          for (int i = 0; i < numberOfMessages; i++)
          {
-            ClientMessage msg = consumer.receive(500000);
+            ClientMessage msg = consumer.receive(5000);
             assertNotNull(msg);
+            System.out.println("Received " + i);
             assertEquals(i, msg.getIntProperty("count").intValue());
             msg.acknowledge();
          }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-11-11 01:11:10 UTC (rev 9869)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-11-11 02:09:12 UTC (rev 9870)
@@ -1553,6 +1553,15 @@
          
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(org.hornetq.core.paging.PageTransactionInfo, int)
+       */
+      public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list