[hornetq-commits] JBoss hornetq SVN: r9866 - in branches/Branch_New_Paging/src/main/org/hornetq/core: postoffice/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 10 14:25:04 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-10 14:25:04 -0500 (Wed, 10 Nov 2010)
New Revision: 9866

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
Log:
async deliveries after paging

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-10 19:25:04 UTC (rev 9866)
@@ -948,8 +948,10 @@
             // This will force everything to be persisted
             message.bodyChanged();
          }
+         
+         Transaction tx = ctx.getTransaction();
 
-         pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(ctx));
+         pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(tx));
 
          int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
@@ -989,9 +991,8 @@
       return ids;
    }
    
-   private long getTransactionID(RoutingContext ctx) throws Exception
+   private long getTransactionID(Transaction tx) throws Exception
    {
-      Transaction tx = ctx.getTransaction();
       if (tx == null)
       {
          return 0l;

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-10 19:25:04 UTC (rev 9866)
@@ -15,9 +15,11 @@
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -59,6 +61,7 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionOperation;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
 import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
@@ -834,6 +837,28 @@
          processRoute(message, context, false);
       }
    }
+   
+   
+   private class PageDelivery extends TransactionOperationAbstract
+   {
+      private Set<Queue> queues = new HashSet<Queue>();
+      
+      public void addQueues(List<Queue> queueList)
+      {
+         queues.addAll(queueList);
+      }
+      
+      public void afterCommit(Transaction tx)
+      {
+         // We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is going towards the queues
+         // The queue will try to depage case it's empty
+         for (Queue queue : queues)
+         {
+            queue.deliverAsync();
+         }
+      }
+      
+   }
 
    private void processRoute(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
    {
@@ -848,6 +873,50 @@
          
          if (store.page(message, context, entry.getValue()))
          {
+            
+            if (tx != null)
+            {
+               PageDelivery delivery = (PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
+               if (delivery == null)
+               {
+                  delivery = new PageDelivery();
+                  tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
+                  tx.addOperation(delivery);
+               }
+               
+               delivery.addQueues(entry.getValue().getDurableQueues());
+               delivery.addQueues(entry.getValue().getNonDurableQueues());
+            }
+            else
+            {
+
+               List<Queue> durableQueues = entry.getValue().getDurableQueues();
+               List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
+               
+               final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
+               
+               queues.addAll(durableQueues);
+               queues.addAll(nonDurableQueues);
+
+               storageManager.afterCompleteOperations(new IOAsyncTask()
+               {
+                  
+                  public void onError(int errorCode, String errorMessage)
+                  {
+                  }
+                  
+                  public void done()
+                  {
+                     for (Queue queue : queues)
+                     {
+                        // in case of paging, we need to kick asynchronous delivery to try delivering
+                        queue.deliverAsync();
+                     }
+                  }
+               });
+            }
+            
+            
             continue;
          }
    

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java	2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionOperationAbstract.java	2010-11-10 19:25:04 UTC (rev 9866)
@@ -50,16 +50,16 @@
    /** After commit shouldn't throw any exception. Any verification has to be done on before commit */
    public void afterCommit(Transaction tx)
    {
-   };
+   }
 
    public void beforeRollback(Transaction tx) throws Exception
    {
-   };
+   }
 
    /** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
    public void afterRollback(Transaction tx)
    {
-   };
+   }
 
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-11-10 17:28:31 UTC (rev 9865)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-11-10 19:25:04 UTC (rev 9866)
@@ -30,7 +30,7 @@
 
    public static final int REFS_OPERATION = 6;
 
-   public static final int PAGE_MESSAGES_OPERATION = 7;
+   public static final int PAGE_DELIVERY = 7;
    
    public static final int PAGE_CURSOR_POSITIONS = 8;
 }



More information about the hornetq-commits mailing list