[hornetq-commits] JBoss hornetq SVN: r9876 - in branches/Branch_New_Paging/src/main/org/hornetq/core: paging/cursor/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 11 12:28:05 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-11 12:28:05 -0500 (Thu, 11 Nov 2010)
New Revision: 9876

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Log:
Scheduling on paging

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-11-11 17:28:05 UTC (rev 9876)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.paging.cursor;
 
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.LinkedListIterator;
@@ -111,4 +112,11 @@
    void setQueue(Queue queue);
    
    Queue getQueue();
+   
+   /**
+    * To be used to requery the reference case the Garbage Collection removed it from the PagedReference as it's using WeakReferences
+    * @param pos
+    * @return
+    */
+   PagedMessage queryMessage(PagePosition pos);
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2010-11-11 17:28:05 UTC (rev 9876)
@@ -13,6 +13,9 @@
 
 package org.hornetq.core.paging.cursor;
 
+import java.lang.ref.WeakReference;
+
+import org.hornetq.api.core.Message;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -33,18 +36,32 @@
 
    private final PagePosition position;
 
-   private final PagedMessage message;
+   private WeakReference<PagedMessage> message;
    
+   private Long deliveryTime = null;
+   
    private final PageSubscription subscription;
 
    public ServerMessage getMessage()
    {
-      return message.getMessage();
+      return getPagedMessage().getMessage();
    }
 
-   public PagedMessage getPagedMessage()
+   public synchronized PagedMessage getPagedMessage()
    {
-      return message;
+      PagedMessage returnMessage = message.get();
+      
+      // We only keep a few references on the Queue from paging...
+      // Besides those references are SoftReferenced on page cache...
+      // So, this will unlikely be null, 
+      // unless the Queue has stalled for some time after paging
+      if (returnMessage == null)
+      {
+         // reference is gone, we will reconstruct it
+         returnMessage = subscription.queryMessage(position);
+         message = new WeakReference<PagedMessage>(returnMessage);
+      }
+      return returnMessage;
    }
 
    public PagePosition getPosition()
@@ -55,7 +72,7 @@
    public PagedReferenceImpl(final PagePosition position, final PagedMessage message, final PageSubscription subscription)
    {
       this.position = position;
-      this.message = message;
+      this.message = new WeakReference<PagedMessage>(message);
       this.subscription = subscription;
    }
 
@@ -78,8 +95,19 @@
     */
    public long getScheduledDeliveryTime()
    {
-      // TODO Auto-generated method stub
-      return 0;
+      if (deliveryTime == null)
+      {
+         ServerMessage msg = getMessage();
+         if (msg.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+         {
+            deliveryTime = getMessage().getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+         }
+         else
+         {
+            deliveryTime = 0l;
+         }
+      }
+      return deliveryTime;
    }
 
    /* (non-Javadoc)
@@ -87,8 +115,7 @@
     */
    public void setScheduledDeliveryTime(final long scheduledDeliveryTime)
    {
-      // TODO Auto-generated method stub
-
+      deliveryTime = scheduledDeliveryTime;
    }
 
    /* (non-Javadoc)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-11 17:28:05 UTC (rev 9876)
@@ -522,9 +522,10 @@
          // the page stays locked until the entire reading is finished
          if (needToRead)
          {
+            Page page = null;
             try
             {
-               Page page = pagingStore.createPage((int)pageId);
+               page = pagingStore.createPage((int)pageId);
 
                page.open();
 
@@ -540,6 +541,16 @@
             }
             finally
             {
+               try
+               {
+                  if (page != null)
+                  {
+                     page.close();
+                  }
+               }
+               catch (Throwable ignored)
+               {
+               }
                cache.unlock();
             }
          }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-11 17:28:05 UTC (rev 9876)
@@ -32,6 +32,7 @@
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.cursor.PageCache;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
@@ -464,6 +465,23 @@
       }
    }
 
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.cursor.PageSubscription#queryMessage(org.hornetq.core.paging.cursor.PagePosition)
+    */
+   public PagedMessage queryMessage(PagePosition pos)
+   {
+      try
+      {
+         return cursorProvider.getMessage(pos);
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
+
    /** 
     * Theres no need to synchronize this method as it's only called from journal load on startup
     */

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-11 14:44:07 UTC (rev 9875)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-11 17:28:05 UTC (rev 9876)
@@ -874,49 +874,8 @@
          if (store.page(message, context, entry.getValue()))
          {
             
-            if (tx != null)
-            {
-               PageDelivery delivery = (PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
-               if (delivery == null)
-               {
-                  delivery = new PageDelivery();
-                  tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
-                  tx.addOperation(delivery);
-               }
-               
-               delivery.addQueues(entry.getValue().getDurableQueues());
-               delivery.addQueues(entry.getValue().getNonDurableQueues());
-            }
-            else
-            {
-
-               List<Queue> durableQueues = entry.getValue().getDurableQueues();
-               List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
-               
-               final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
-               
-               queues.addAll(durableQueues);
-               queues.addAll(nonDurableQueues);
-
-               storageManager.afterCompleteOperations(new IOAsyncTask()
-               {
-                  
-                  public void onError(int errorCode, String errorMessage)
-                  {
-                  }
-                  
-                  public void done()
-                  {
-                     for (Queue queue : queues)
-                     {
-                        // in case of paging, we need to kick asynchronous delivery to try delivering
-                        queue.deliverAsync();
-                     }
-                  }
-               });
-            }
-            
-            
+            // We need to kick delivery so the Queues may check for the cursors case they are empty
+            schedulePageDelivery(tx, entry);
             continue;
          }
    
@@ -1023,6 +982,56 @@
    }
 
    /**
+    * This will kick a delivery async on the queue, so the queue may have a chance to depage messages
+    * @param tx
+    * @param entry
+    */
+   private void schedulePageDelivery(Transaction tx, Map.Entry<SimpleString, RouteContextList> entry)
+   {
+      if (tx != null)
+      {
+         PageDelivery delivery = (PageDelivery)tx.getProperty(TransactionPropertyIndexes.PAGE_DELIVERY);
+         if (delivery == null)
+         {
+            delivery = new PageDelivery();
+            tx.putProperty(TransactionPropertyIndexes.PAGE_DELIVERY, delivery);
+            tx.addOperation(delivery);
+         }
+         
+         delivery.addQueues(entry.getValue().getDurableQueues());
+         delivery.addQueues(entry.getValue().getNonDurableQueues());
+      }
+      else
+      {
+
+         List<Queue> durableQueues = entry.getValue().getDurableQueues();
+         List<Queue> nonDurableQueues = entry.getValue().getNonDurableQueues();
+         
+         final List<Queue> queues = new ArrayList<Queue>(durableQueues.size() + nonDurableQueues.size());
+         
+         queues.addAll(durableQueues);
+         queues.addAll(nonDurableQueues);
+
+         storageManager.afterCompleteOperations(new IOAsyncTask()
+         {
+            
+            public void onError(int errorCode, String errorMessage)
+            {
+            }
+            
+            public void done()
+            {
+               for (Queue queue : queues)
+               {
+                  // in case of paging, we need to kick asynchronous delivery to try delivering
+                  queue.deliverAsync();
+               }
+            }
+         });
+      }
+   }
+
+   /**
     * @param refs
     */
    private void addReferences(final List<MessageReference> refs, final boolean direct)



More information about the hornetq-commits mailing list