[hornetq-commits] JBoss hornetq SVN: r9882 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 12 14:37:53 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-12 14:37:52 -0500 (Fri, 12 Nov 2010)
New Revision: 9882

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
tweak on ordering

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-12 16:56:31 UTC (rev 9881)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2010-11-12 19:37:52 UTC (rev 9882)
@@ -296,7 +296,7 @@
       PagePosition retPos = pos.nextMessage();
 
       PageCache cache = cursorProvider.getPageCache(pos);
-      
+
       if (cache == null)
       {
          return null;
@@ -1014,40 +1014,19 @@
 
          try
          {
-            synchronized (redeliveries)
-            {
-               if (redeliveryIterator.hasNext())
-               {
-                  // There's a redelivery pending, we will get it out of that pool instead
-                  isredelivery = true;
-                  return getReference(redeliveryIterator.next());
-               }
-               else
-               {
-                  isredelivery = false;
-               }
-            }
-
             if (position == null)
             {
                position = getStartPosition();
             }
 
-            PagePosition previousPos = position;
-            PagedReference nextPos = moveNext();
-            if (nextPos != null)
-            {
-               lastOperation = previousPos;
-               position = nextPos.getPosition();
-            }
-            return nextPos;
+            return moveNext();
          }
          catch (Exception e)
          {
             throw new RuntimeException(e.getMessage(), e);
          }
       }
-      
+
       /* (non-Javadoc)
        * @see org.hornetq.core.paging.cursor.PageCursor#moveNext()
        */
@@ -1057,18 +1036,32 @@
 
          PagedReference message = null;
 
+         PagePosition lastPosition = position;
          PagePosition tmpPosition = position;
 
          do
          {
-            message = internalGetNext(tmpPosition);
-            
+            synchronized (redeliveries)
+            {
+               if (redeliveryIterator.hasNext())
+               {
+                  // There's a redelivery pending, we will get it out of that pool instead
+                  isredelivery = true;
+                  return getReference(redeliveryIterator.next());
+               }
+               else
+               {
+                  isredelivery = false;
+               }
 
+               message = internalGetNext(tmpPosition);
+            }
+
             if (message == null)
             {
                break;
             }
-            
+
             tmpPosition = message.getPosition();
 
             boolean valid = true;
@@ -1079,13 +1072,14 @@
             // 1st... is it routed?
 
             valid = routed(message.getPagedMessage());
-            if (!valid) ignored = true;
+            if (!valid)
+               ignored = true;
 
             // 2nd ... if TX, is it committed?
             if (valid && message.getPagedMessage().getTransactionID() != 0)
             {
                PageTransactionInfo tx = pageStore.getPagingManager().getTransaction(message.getPagedMessage()
-                                                                                          .getTransactionID());
+                                                                                           .getTransactionID());
                if (tx == null)
                {
                   log.warn("Couldn't locate page transaction " + message.getPagedMessage().getTransactionID() +
@@ -1108,7 +1102,8 @@
             if (valid)
             {
                // We don't create a PageCursorInfo unless we are doing a write operation (ack or removing)
-               // Say you have a Browser that will only read the files... there's no need to control PageCursors is nothing
+               // Say you have a Browser that will only read the files... there's no need to control PageCursors is
+               // nothing
                // is being changed. That's why the false is passed as a parameter here
                PageCursorInfo info = getPageInfo(message.getPosition(), false);
                if (info != null && info.isRemoved(message.getPosition()))
@@ -1116,7 +1111,7 @@
                   valid = false;
                }
             }
-            
+
             if (!ignored)
             {
                position = message.getPosition();
@@ -1138,10 +1133,14 @@
          }
          while (message != null && !match);
 
+         if (message != null)
+         {
+            lastOperation = lastPosition;
+         }
+
          return message;
       }
 
-
       /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be using next and hasNext as well. 
        *  It would be a rare race condition but I would prefer avoiding that scenario */
       public synchronized boolean hasNext()

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-12 16:56:31 UTC (rev 9881)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-11-12 19:37:52 UTC (rev 9882)
@@ -144,7 +144,6 @@
 
    public synchronized void commit()
    {
-      committed = true;
       if (lateDeliveries != null)
       {
          for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
@@ -153,6 +152,7 @@
          }
          lateDeliveries.clear();
       }
+      committed = true;
       lateDeliveries = null;
    }
 

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-12 16:56:31 UTC (rev 9881)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-12 19:37:52 UTC (rev 9882)
@@ -922,7 +922,7 @@
                      msg.putIntProperty("count", i);
                      producer.send(msg);
 
-                     if (i % 50 == 0 && i != 0)
+                     if (i % 100 == 0 && i != 0)
                      {
                         sessionProducer.commit();
                         // Thread.sleep(500);
@@ -967,16 +967,16 @@
 
          for (int i = 0; i < numberOfMessages; i++)
          {
-            ClientMessage msg = consumer.receive(500000);
+            ClientMessage msg = consumer.receive(5000);
             assertNotNull(msg);
             assertEquals(i, msg.getIntProperty("count").intValue());
             msg.acknowledge();
             if (i > 0 && i % 10 == 0)
             {
-               // session.commit();
+              session.commit();
             }
          }
-         // session.commit();
+         session.commit();
 
          session.close();
 



More information about the hornetq-commits mailing list