[hornetq-commits] JBoss hornetq SVN: r9858 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 9 14:27:39 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-09 14:27:38 -0500 (Tue, 09 Nov 2010)
New Revision: 9858

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
commit before a small refactoring

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PageCursorProvider.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -39,6 +39,8 @@
 
    PageCache getPageCache(PagePosition pos);
    
+   PagedReference newReference(final PagePosition pos, final PagedMessage msg);
+   
    void addPageCache(PageCache cache);
 
    PagingStore getAssociatedStore();
@@ -52,7 +54,7 @@
    
    PageSubscription createSubscription(long queueId, Filter filter, boolean durable);
    
-   PagedReferenceImpl getNext(PageSubscription cursor, PagePosition pos) throws Exception;
+   PagedReference getNext(PageSubscription cursor, PagePosition pos) throws Exception;
    
    PagedMessage getMessage(PagePosition pos) throws Exception;
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -17,6 +17,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
 
 /**
  * A InternalReference
@@ -33,7 +34,11 @@
    private PagePosition a;
    private PagedMessage b;
    
+   private Queue queue;
    
+   private PageSubscription subscription;
+   
+   
    public ServerMessage getMessage()
    {
       return b.getMessage();
@@ -140,4 +145,22 @@
       // TODO Auto-generated method stub
       
    }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#acknowledge()
+    */
+   public void acknowledge() throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction)
+    */
+   public void acknowledge(Transaction tx) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -30,6 +30,7 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.PageSubscription;
+import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.ServerMessage;
@@ -122,12 +123,12 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursorProvider#getAfter(org.hornetq.core.paging.cursor.PagePosition)
     */
-   public PagedReferenceImpl getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
+   public PagedReference getNext(final PageSubscription cursor, PagePosition cursorPos) throws Exception
    {
 
       while (true)
       {
-         PagedReferenceImpl retPos = internalGetNext(cursorPos);
+         PagedReference retPos = internalGetNext(cursorPos);
 
          if (retPos == null)
          {
@@ -182,7 +183,7 @@
       return false;
    }
 
-   private PagedReferenceImpl internalGetNext(final PagePosition pos)
+   private PagedReference internalGetNext(final PagePosition pos)
    {
       PagePosition retPos = pos.nextMessage();
 
@@ -209,7 +210,7 @@
 
       if (serverMessage != null)
       {
-         return new PagedReferenceImpl(retPos, cache.getMessage(retPos.getMessageNr()));
+         return newReference(retPos, serverMessage);
       }
       else
       {
@@ -229,6 +230,11 @@
 
       return cache.getMessage(pos.getMessageNr());
    }
+   
+   public PagedReference newReference(final PagePosition pos, final PagedMessage msg)
+   {
+      return new PagedReferenceImpl(pos, msg);
+   }
 
    /**
     * No need to synchronize this method since the private getPageCache will have a synchronized call

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -177,7 +177,7 @@
       
       /** next element taken on hasNext test.
        *  it has to be delivered on next next operation */
-      PagedReferenceImpl cachedNext;
+      PagedReference cachedNext;
 
       public void repeat()
       {
@@ -201,13 +201,14 @@
       /* (non-Javadoc)
        * @see java.util.Iterator#next()
        */
-      public synchronized PagedReferenceImpl next()
+      public synchronized PagedReference next()
       {
          
          if (cachedNext != null)
          {
-            PagedReferenceImpl retPos = cachedNext;
+            PagedReference retPos = cachedNext;
             cachedNext = null;
+            System.out.println("Returning cached next " + retPos);
             return retPos;
          }
          
@@ -215,8 +216,9 @@
          {
             if (redeliveryIterator.hasNext())
             {
+               // There's a redelivery pending, we will get it out of that pool instead
                isredelivery = true;
-               return getMessage(redeliveryIterator.next());
+               return getReference(redeliveryIterator.next());
             }
             else
             {
@@ -228,7 +230,7 @@
                position = getStartPosition();
             }
 
-            PagedReferenceImpl nextPos = moveNext(position);
+            PagedReference nextPos = moveNext(position);
             if (nextPos != null)
             {
                lastOperation = position;
@@ -278,9 +280,9 @@
       }
    }
 
-   private PagedReferenceImpl getMessage(PagePosition pos) throws Exception
+   private PagedReference getReference(PagePosition pos) throws Exception
    {
-      return new PagedReferenceImpl(pos, cursorProvider.getMessage(pos));
+      return cursorProvider.newReference(pos, cursorProvider.getMessage(pos));
    }
 
    /* (non-Javadoc)
@@ -294,11 +296,11 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
     */
-   public synchronized PagedReferenceImpl moveNext(PagePosition position) throws Exception
+   public synchronized PagedReference moveNext(PagePosition position) throws Exception
    {
       boolean match = false;
 
-      PagedReferenceImpl message = null;
+      PagedReference message = null;
 
       PagePosition tmpPosition = position;
 
@@ -307,12 +309,16 @@
          message = cursorProvider.getNext(this, tmpPosition);
          
          boolean valid = true;
+         
          if (message == null)
          {
             valid = false;
          }
          else
          {
+            // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
+            // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing 
+            // is being changed. That's why the false is passed as a parameter here
             PageCursorInfo info = getPageInfo(message.getPosition(), false);
             if (info != null && info.isRemoved(message.getPosition()))
             {
@@ -847,6 +853,11 @@
 
    // Inner classes -------------------------------------------------
 
+   /** 
+    * This will hold information about the pending ACKs towards a page.
+    * This instance will be released as soon as the entire page is consumed, releasing the memory at that point
+    * The ref counts are increased also when a message is ignored for any reason.
+    * */
    private class PageCursorInfo
    {
       // Number of messages existent on this page

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/MessageReference.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.server;
 
+import org.hornetq.core.transaction.Transaction;
+
 /**
  * A reference to a message.
  * 
@@ -51,6 +53,11 @@
    void decrementDeliveryCount();
 
    Queue getQueue();
+   
+   void acknowledge() throws Exception;
+   
+   void acknowledge(final Transaction tx) throws Exception;
 
+
    void handled();
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -29,6 +29,7 @@
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.Transaction;
 
 /**
  * A queue that will discard messages if a newer message with the same MessageImpl.HDR_LAST_VALUE_NAME property value.
@@ -92,7 +93,7 @@
 
             try
             {
-               super.acknowledge(oldRef);
+               oldRef.acknowledge();
             }
             catch (Exception e)
             {
@@ -233,5 +234,21 @@
       {
          return false;
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.server.MessageReference)
+       */
+      public void acknowledge() throws Exception
+      {
+         ref.acknowledge();
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
+       */
+      public void acknowledge(Transaction tx) throws Exception
+      {
+         ref.acknowledge(tx);
+      }
    }
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -17,6 +17,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.MemorySize;
 
 /**
@@ -150,6 +151,23 @@
       return false;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.server.MessageReference)
+    */
+   public void acknowledge() throws Exception
+   {
+      queue.acknowledge(this);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#acknowledge(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
+    */
+   public void acknowledge(Transaction tx) throws Exception
+   {
+      queue.acknowledge(tx, this);
+   }
+
+
    // Public --------------------------------------------------------
 
    @Override
@@ -159,7 +177,6 @@
              "]:" +
              (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE");
    }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -675,32 +675,46 @@
 
    public void acknowledge(final MessageReference ref) throws Exception
    {
-      ServerMessage message = ref.getMessage();
-
-      boolean durableRef = message.isDurable() && durable;
-
-      if (durableRef)
+      if (ref.isPaged())
       {
-         storageManager.storeAcknowledge(id, message.getMessageID());
+         pageSubscription.ack((PagedReference)ref);
       }
+      else
+      {
+         ServerMessage message = ref.getMessage();
+   
+         boolean durableRef = message.isDurable() && durable;
+   
+         if (durableRef)
+         {
+            storageManager.storeAcknowledge(id, message.getMessageID());
+         }
+         postAcknowledge(ref);
+      }
 
-      postAcknowledge(ref);
    }
 
    public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception
    {
-      ServerMessage message = ref.getMessage();
-
-      boolean durableRef = message.isDurable() && durable;
-
-      if (durableRef)
+      if (ref.isPaged())
       {
-         storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
-
-         tx.setContainsPersistent();
+         pageSubscription.ackTx(tx, (PagedReference)ref);
       }
-
-      getRefsOperation(tx).addAck(ref);
+      else
+      {
+         ServerMessage message = ref.getMessage();
+   
+         boolean durableRef = message.isDurable() && durable;
+   
+         if (durableRef)
+         {
+            storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
+   
+            tx.setContainsPersistent();
+         }
+   
+         getRefsOperation(tx).addAck(ref);
+      }
    }
 
    public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -521,11 +521,11 @@
 
          if (autoCommitAcks || tx == null)
          {
-            ref.getQueue().acknowledge(ref);
+            ref.acknowledge();
          }
          else
          {
-            ref.getQueue().acknowledge(tx, ref);
+            ref.acknowledge(tx);
          }
       }
       while (ref.getMessage().getMessageID() != messageID);

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-09 04:23:26 UTC (rev 9857)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-09 19:27:38 UTC (rev 9858)
@@ -315,6 +315,8 @@
 
    private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
    {
+      
+      System.out.println("PageDir:" + getPageDir());
       clearData();
 
       Configuration config = createDefaultConfig();



More information about the hornetq-commits mailing list