[hornetq-commits] JBoss hornetq SVN: r9862 - in branches/Branch_New_Paging/src/main/org/hornetq/core: server/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 9 18:58:11 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-09 18:58:11 -0500 (Tue, 09 Nov 2010)
New Revision: 9862

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
changes

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-09 23:38:48 UTC (rev 9861)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-09 23:58:11 UTC (rev 9862)
@@ -51,6 +51,7 @@
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -988,7 +989,7 @@
       return ids;
    }
    
-   private long getTransactionID(RoutingContext ctx)
+   private long getTransactionID(RoutingContext ctx) throws Exception
    {
       Transaction tx = ctx.getTransaction();
       if (tx == null)
@@ -997,10 +998,68 @@
       }
       else
       {
+         if (tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION) == null)
+         {
+            PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+            System.out.println("Creating pageTransaction " + pgTX.getTransactionID());
+            storageManager.storePageTransaction(tx.getID(), pgTX);
+            pagingManager.addTransaction(pgTX);
+            tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
+            tx.addOperation(new FinishPageMessageOperation());
+            
+            tx.setContainsPersistent();
+         }
+         
          return tx.getID();
       }
    }
 
+   
+   private static class FinishPageMessageOperation implements TransactionOperation
+   {
+
+      public void afterCommit(final Transaction tx)
+      {
+         // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+         // transaction until all the messages were added to the queue
+         // or else we could deliver the messages out of order
+
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+         if (pageTransaction != null)
+         {
+            pageTransaction.commit();
+         }
+      }
+
+      public void afterPrepare(final Transaction tx)
+      {
+      }
+
+      public void afterRollback(final Transaction tx)
+      {
+         PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+         if (tx.getState() == State.PREPARED && pageTransaction != null)
+         {
+            pageTransaction.rollback();
+         }
+      }
+
+      public void beforeCommit(final Transaction tx) throws Exception
+      {
+      }
+
+      public void beforePrepare(final Transaction tx) throws Exception
+      {
+      }
+
+      public void beforeRollback(final Transaction tx) throws Exception
+      {
+      }
+
+   }
+
    /**
     * This method will remove files from the page system and and route them, doing it transactionally
     *     

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-09 23:38:48 UTC (rev 9861)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-09 23:58:11 UTC (rev 9862)
@@ -1267,13 +1267,20 @@
       {
          return;
       }
-
-      int nmessages = 0;
-      while (nmessages < MAX_DELIVERIES_IN_LOOP && pageIterator.hasNext())
+      
+      int msgsToDeliver = MAX_DELIVERIES_IN_LOOP - (messageReferences.size() + getScheduledCount() + concurrentQueue.size());
+      
+      if (msgsToDeliver > 0)
       {
-         nmessages ++;
-         addTail(pageIterator.next(), false);
-         pageIterator.remove();
+         System.out.println("Depaging " + msgsToDeliver + " messages");
+   
+         int nmessages = 0;
+         while (nmessages < msgsToDeliver && pageIterator.hasNext())
+         {
+            nmessages ++;
+            addTail(pageIterator.next(), false);
+            pageIterator.remove();
+         }
       }
       
       deliverAsync();



More information about the hornetq-commits mailing list