[hornetq-commits] JBoss hornetq SVN: r9884 - in branches/Branch_New_Paging/src/main/org/hornetq/core: paging/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 12 21:43:48 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-12 21:43:47 -0500 (Fri, 12 Nov 2010)
New Revision: 9884

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
tweaks

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-12 20:24:00 UTC (rev 9883)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-13 02:43:47 UTC (rev 9884)
@@ -209,8 +209,8 @@
             {
                if (entry.getKey() == lastAckedPosition.getPageNr())
                {
-                  PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() +
-                                             " now since it's the current page");
+                  // PageSubscriptionImpl.trace("We can't clear page " + entry.getKey() +
+                  // " now since it's the current page");
                }
                else
                {
@@ -1030,115 +1030,120 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
        */
-      public synchronized PagedReference moveNext() throws Exception
+      public PagedReference moveNext() throws Exception
       {
-         boolean match = false;
+         synchronized (PageSubscriptionImpl.this)
+         {
+            boolean match = false;
 
-         PagedReference message = null;
+            PagedReference message = null;
 
-         PagePosition lastPosition = position;
-         PagePosition tmpPosition = position;
+            PagePosition lastPosition = position;
+            PagePosition tmpPosition = position;
 
-         do
-         {
-            synchronized (redeliveries)
+            do
             {
-               if (redeliveryIterator.hasNext())
+               synchronized (redeliveries)
                {
-                  // There's a redelivery pending, we will get it out of that pool instead
-                  isredelivery = true;
-                  return getReference(redeliveryIterator.next());
+                  if (redeliveryIterator.hasNext())
+                  {
+                     // There's a redelivery pending, we will get it out of that pool instead
+                     isredelivery = true;
+                     PagedReference redeliveredMsg = getReference(redeliveryIterator.next());
+
+                     return redeliveredMsg;
+                  }
+                  else
+                  {
+                     isredelivery = false;
+                  }
+
+                  message = internalGetNext(tmpPosition);
                }
-               else
+
+               if (message == null)
                {
-                  isredelivery = false;
+                  break;
                }
 
-               message = internalGetNext(tmpPosition);
-            }
+               tmpPosition = message.getPosition();
 
-            if (message == null)
-            {
-               break;
-            }
+               boolean valid = true;
+               boolean ignored = false;
 
-            tmpPosition = message.getPosition();
+               // Validate the scenarios where the message should be considered not valid even to be considered
 
-            boolean valid = true;
-            boolean ignored = false;
+               // 1st... is it routed?
 
-            // Validate the scenarios where the message should be considered not valid even to be considered
+               valid = routed(message.getPagedMessage());
+               if (!valid)
+                  ignored = true;
 
-            // 1st... is it routed?
-
-            valid = routed(message.getPagedMessage());
-            if (!valid)
-               ignored = true;
-
-            // 2nd ... if TX, is it committed?
-            if (valid && message.getPagedMessage().getTransactionID() != 0)
-            {
-               PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
-                                                                                           .getTransactionID());
-               if (tx == null)
+               // 2nd ... if TX, is it committed?
+               if (valid && message.getPagedMessage().getTransactionID() != 0)
                {
-                  log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
-                           ", ignoring message on position " +
-                           message.getPosition());
-                  valid = false;
-                  ignored = true;
+                  PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+                                                                                              .getTransactionID());
+                  if (tx == null)
+                  {
+                     log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
+                              ", ignoring message on position " +
+                              message.getPosition());
+                     valid = false;
+                     ignored = true;
+                  }
+                  else
+                  {
+                     if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition()))
+                     {
+                        valid = false;
+                        ignored = false;
+                     }
+                  }
                }
-               else
+
+               // 3rd... was it previously removed?
+               if (valid)
                {
-                  if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition()))
+                  // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
+                  // Say you have a Browser that will only read the files... there's no need to control PageCursors is
+                  // nothing
+                  // is being changed. That's why the false is passed as a parameter here
+                  PageCursorInfo info = getPageInfo(message.getPosition(), false);
+                  if (info != null && info.isRemoved(message.getPosition()))
                   {
                      valid = false;
-                     ignored = false;
                   }
                }
-            }
 
-            // 3rd... was it previously removed?
-            if (valid)
-            {
-               // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
-               // Say you have a Browser that will only read the files... there's no need to control PageCursors is
-               // nothing
-               // is being changed. That's why the false is passed as a parameter here
-               PageCursorInfo info = getPageInfo(message.getPosition(), false);
-               if (info != null && info.isRemoved(message.getPosition()))
+               if (!ignored)
                {
-                  valid = false;
+                  position = message.getPosition();
                }
-            }
 
-            if (!ignored)
-            {
-               position = message.getPosition();
-            }
+               if (valid)
+               {
+                  match = match(message.getMessage());
 
-            if (valid)
-            {
-               match = match(message.getMessage());
-
-               if (!match)
+                  if (!match)
+                  {
+                     processACK(message.getPosition());
+                  }
+               }
+               else if (ignored)
                {
-                  processACK(message.getPosition());
+                  positionIgnored(message.getPosition());
                }
             }
-            else if (ignored)
+            while (message != null && !match);
+
+            if (message != null)
             {
-               positionIgnored(message.getPosition());
+               lastOperation = lastPosition;
             }
-         }
-         while (message != null && !match);
 
-         if (message != null)
-         {
-            lastOperation = lastPosition;
+            return message;
          }
-
-         return message;
       }
 
       /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-12 20:24:00 UTC (rev 9883)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-13 02:43:47 UTC (rev 9884)
@@ -51,6 +51,8 @@
    private volatile long recordID = -1;
 
    private volatile boolean committed = false;
+   
+   private volatile boolean useRedelivery = false;
 
    private volatile boolean rolledback = false;
 
@@ -237,6 +239,12 @@
     */
    public synchronized boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos)
    {
+      if (committed && useRedelivery)
+      {
+         cursor.redeliver(cursorPos);
+         return true;
+      }
+      else
       if (committed)
       {
          return false;
@@ -249,6 +257,7 @@
       }
       else
       {
+         useRedelivery = true;
          if (lateDeliveries == null)
          {
             lateDeliveries = new LinkedList<Pair<PageSubscription, PagePosition>>();

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-12 20:24:00 UTC (rev 9883)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-13 02:43:47 UTC (rev 9884)
@@ -1273,7 +1273,7 @@
       if (msgsToDeliver > 0)
       {
          //System.out.println("Depaging " + msgsToDeliver + " messages");
-         System.out.println("Depage "  + msgsToDeliver + " now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
+         //System.out.println("Depage "  + msgsToDeliver + " now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
    
          int nmessages = 0;
          while (nmessages < msgsToDeliver && pageIterator.hasNext())
@@ -1283,12 +1283,12 @@
             pageIterator.remove();
          }
          
-         System.out.println("Depaged " + nmessages);
+         //System.out.println("Depaged " + nmessages);
       }
-      else
-      {
-         System.out.println("Depaging not being done now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
-      }
+//      else
+//      {
+//         System.out.println("Depaging not being done now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
+//      }
       
       deliverAsync();
    }



More information about the hornetq-commits mailing list