[hornetq-commits] JBoss hornetq SVN: r9881 - branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 12 11:56:31 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-12 11:56:31 -0500 (Fri, 12 Nov 2010)
New Revision: 9881

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
Log:
fix TX Ordering on paging

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-12 06:51:54 UTC (rev 9880)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-12 16:56:31 UTC (rev 9881)
@@ -291,94 +291,6 @@
       return new CursorIterator();
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
-    */
-   public synchronized PagedReference moveNext(PagePosition position) throws Exception
-   {
-      boolean match = false;
-
-      PagedReference message = null;
-
-      PagePosition tmpPosition = position;
-
-      do
-      {
-         message = internalGetNext(tmpPosition);
-         
-
-         if (message == null)
-         {
-            break;
-         }
-         
-         tmpPosition = message.getPosition();
-
-         boolean valid = true;
-         boolean ignored = false;
-
-         // Validate the scenarios where the message should be considered not valid even to be considered
-
-         // 1st... is it routed?
-
-         valid = routed(message.getPagedMessage());
-         if (!valid) ignored = true;
-
-         // 2nd ... if TX, is it committed?
-         if (valid && message.getPagedMessage().getTransactionID() != 0)
-         {
-            PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
-                                                                                       .getTransactionID());
-            if (tx == null)
-            {
-               log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
-                        ", ignoring message on position " +
-                        message.getPosition());
-               valid = false;
-               ignored = true;
-            }
-            else
-            {
-               if (tx.deliverAfterCommit(this, message.getPosition()))
-               {
-                  valid = false;
-                  ignored = false;
-               }
-            }
-         }
-
-         // 3rd... was it previously removed?
-         if (valid)
-         {
-            // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
-            // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
-            // is being changed. That's why the false is passed as a parameter here
-            PageCursorInfo info = getPageInfo(message.getPosition(), false);
-            if (info != null && info.isRemoved(message.getPosition()))
-            {
-               valid = false;
-            }
-         }
-
-         if (valid)
-         {
-            match = match(message.getMessage());
-
-            if (!match)
-            {
-               processACK(message.getPosition());
-            }
-         }
-         else if (ignored)
-         {
-            positionIgnored(message.getPosition());
-         }
-      }
-      while (message != null && !match);
-
-      return message;
-   }
-
    private PagedReference internalGetNext(final PagePosition pos)
    {
       PagePosition retPos = pos.nextMessage();
@@ -1121,10 +1033,11 @@
                position = getStartPosition();
             }
 
-            PagedReference nextPos = moveNext(position);
+            PagePosition previousPos = position;
+            PagedReference nextPos = moveNext();
             if (nextPos != null)
             {
-               lastOperation = position;
+               lastOperation = previousPos;
                position = nextPos.getPosition();
             }
             return nextPos;
@@ -1134,7 +1047,101 @@
             throw new RuntimeException(e.getMessage(), e);
          }
       }
+      
+      /* (non-Javadoc)
+       * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
+       */
+      public synchronized PagedReference moveNext() throws Exception
+      {
+         boolean match = false;
 
+         PagedReference message = null;
+
+         PagePosition tmpPosition = position;
+
+         do
+         {
+            message = internalGetNext(tmpPosition);
+            
+
+            if (message == null)
+            {
+               break;
+            }
+            
+            tmpPosition = message.getPosition();
+
+            boolean valid = true;
+            boolean ignored = false;
+
+            // Validate the scenarios where the message should be considered not valid even to be considered
+
+            // 1st... is it routed?
+
+            valid = routed(message.getPagedMessage());
+            if (!valid) ignored = true;
+
+            // 2nd ... if TX, is it committed?
+            if (valid && message.getPagedMessage().getTransactionID() != 0)
+            {
+               PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
+                                                                                          .getTransactionID());
+               if (tx == null)
+               {
+                  log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
+                           ", ignoring message on position " +
+                           message.getPosition());
+                  valid = false;
+                  ignored = true;
+               }
+               else
+               {
+                  if (tx.deliverAfterCommit(PageSubscriptionImpl.this, message.getPosition()))
+                  {
+                     valid = false;
+                     ignored = false;
+                  }
+               }
+            }
+
+            // 3rd... was it previously removed?
+            if (valid)
+            {
+               // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
+               // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
+               // is being changed. That's why the false is passed as a parameter here
+               PageCursorInfo info = getPageInfo(message.getPosition(), false);
+               if (info != null && info.isRemoved(message.getPosition()))
+               {
+                  valid = false;
+               }
+            }
+            
+            if (!ignored)
+            {
+               position = message.getPosition();
+            }
+
+            if (valid)
+            {
+               match = match(message.getMessage());
+
+               if (!match)
+               {
+                  processACK(message.getPosition());
+               }
+            }
+            else if (ignored)
+            {
+               positionIgnored(message.getPosition());
+            }
+         }
+         while (message != null && !match);
+
+         return message;
+      }
+
+
       /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. 
        *  It would be a rare race condition but I would prefer avoiding that scenario */
       public synchronized boolean hasNext()



More information about the hornetq-commits mailing list