[hornetq-commits] JBoss hornetq SVN: r10181 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Feb 4 18:25:01 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-04 18:25:01 -0500 (Fri, 04 Feb 2011)
New Revision: 10181

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Improvements on non-blocking paging

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2011-02-04 23:25:01 UTC (rev 10181)
@@ -14,6 +14,7 @@
 package org.hornetq.core.paging.cursor;
 
 import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.utils.LinkedListIterator;
@@ -30,6 +31,8 @@
 
    // Cursor query operations --------------------------------------
 
+   PagingStore getPagingStore();
+   
    // To be called before the server is down
    void stop();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-02-04 23:25:01 UTC (rev 10181)
@@ -49,7 +49,7 @@
 
    public synchronized PagedMessage getPagedMessage()
    {
-      PagedMessage returnMessage = message.get();
+      PagedMessage returnMessage = message != null ? message.get() : null;
       
       // We only keep a few references on the Queue from paging...
       // Besides those references are SoftReferenced on page cache...

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-02-04 23:25:01 UTC (rev 10181)
@@ -69,11 +69,11 @@
 
    // Attributes ----------------------------------------------------
 
-   private final boolean isTrace = false; // PageCursorImpl.log.isTraceEnabled();
+   private final boolean isTrace = PageSubscriptionImpl.log.isTraceEnabled();
 
    private static void trace(final String message)
    {
-      PageSubscriptionImpl.log.info(message);
+	      PageSubscriptionImpl.log.trace(message);
    }
 
    private volatile boolean autoCleanup = true;
@@ -131,6 +131,11 @@
 
    // Public --------------------------------------------------------
 
+   public PagingStore getPagingStore()
+   {
+      return pageStore;
+   }
+
    public Queue getQueue()
    {
       return queue;
@@ -534,6 +539,7 @@
     */
    public void reloadPreparedACK(final Transaction tx, final PagePosition position)
    {
+      deliveredCount.incrementAndGet();
       installTXCallback(tx, position);
    }
 
@@ -777,6 +783,8 @@
          // It needs to persist, otherwise the cursor will return to the fist page position
          tx.setContainsPersistent();
       }
+      
+      getPageInfo(position).remove(position);
 
       PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
 
@@ -903,6 +911,7 @@
       public void decrementPendingTX()
       {
          pendingTX.decrementAndGet();
+         checkDone();
       }
 
       public boolean isRemoved(final PagePosition pos)
@@ -928,17 +937,26 @@
                                        ", page = " +
                                        pageId);
          }
-
+         
          // Negative could mean a bookmark on the first element for the page (example -1)
          if (posACK.getMessageNr() >= 0)
          {
-            if (getNumberOfMessages() == confirmed.incrementAndGet())
-            {
-               onPageDone(this);
-            }
+            confirmed.incrementAndGet();
+            checkDone();
          }
       }
 
+      /**
+       * 
+       */
+      protected void checkDone()
+      {
+         if (isDone())
+         {
+            onPageDone(this);
+         }
+      }
+
       private int getNumberOfMessages()
       {
          if (wasLive)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-04 23:25:01 UTC (rev 10181)
@@ -56,6 +56,7 @@
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
+import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.GroupingInfo;
@@ -1715,6 +1716,7 @@
                   if (sub != null)
                   {
                      sub.reloadPreparedACK(tx, encoding.position);
+                     referencesToAck.add(new PagedReferenceImpl(encoding.position, null, sub));
                   }
                   else
                   {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-04 23:25:01 UTC (rev 10181)
@@ -32,6 +32,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PagedReference;
 import org.hornetq.core.persistence.StorageManager;
@@ -103,6 +104,9 @@
    
    // The quantity of pagedReferences on messageREferences priority list
    private final AtomicInteger pagedReferences = new AtomicInteger(0);
+   
+   // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
+   private final AtomicInteger queueMemorySize = new AtomicInteger(0);
 
    private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
 
@@ -327,6 +331,7 @@
 
    public synchronized void reload(final MessageReference ref)
    {
+      queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
       if (!scheduledDeliveryHandler.checkAndSchedule(ref))
       {
          internalAddTail(ref);
@@ -375,6 +380,8 @@
       {
          return;
       }
+      
+      queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
 
       concurrentQueue.add(ref);
 
@@ -1214,22 +1221,18 @@
     */
    private void internalAddTail(final MessageReference ref)
    {
-      if (ref.isPaged())
-      {
-         pagedReferences.incrementAndGet();
-      }
+      refAdded(ref);
       messageReferences.addTail(ref, ref.getMessage().getPriority());
    }
 
+
    /**
     * @param ref
     */
    private void internalAddHead(final MessageReference ref)
    {
-      if (ref.isPaged())
-      {
-         pagedReferences.incrementAndGet();
-      }
+      queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+      refAdded(ref);
       messageReferences.addHead(ref, ref.getMessage().getPriority());
    }
 
@@ -1392,12 +1395,25 @@
     */
    private void refRemoved(MessageReference ref)
    {
+      queueMemorySize.addAndGet(-ref.getMessage().getMemoryEstimate());
       if (ref.isPaged())
       {
          pagedReferences.decrementAndGet();
       }
    }
    
+   /**
+    * @param ref
+    */
+   protected void refAdded(final MessageReference ref)
+   {
+      if (ref.isPaged())
+      {
+         pagedReferences.incrementAndGet();
+      }
+   }
+
+   
    private void scheduleDepage()
    {
       executor.execute(depageRunner);
@@ -1405,33 +1421,23 @@
    
    private void depage()
    {
-      if (paused || consumerList.isEmpty())
+      if (paused || pageIterator == null || consumerList.isEmpty())
       {
          return;
       }
       
-      int msgsToDeliver = MAX_DELIVERIES_IN_LOOP - (messageReferences.size() + getScheduledCount() + concurrentQueue.size());
+      long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
       
-      if (msgsToDeliver > 0)
+      //System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+      while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
       {
-         //System.out.println("Depaging " + msgsToDeliver + " messages");
-         //System.out.println("Depage "  + msgsToDeliver + " now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
-   
-         int nmessages = 0;
-         while (nmessages < msgsToDeliver && pageIterator.hasNext())
-         {
-            nmessages ++;
-            addTail(pageIterator.next(), false);
-            pageIterator.remove();
-         }
-         
-         //System.out.println("Depaged " + nmessages);
+         PagedReference reference = pageIterator.next();
+         addTail(reference, false);
+         pageIterator.remove();
       }
-//      else
-//      {
-//         System.out.println("Depaging not being done now.. there are msgRef = " + messageReferences.size() + " scheduled = " + getScheduledCount() + " concurrentQueue.size() = " + concurrentQueue.size());
-//      }
+      //System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
       
+      
       deliverAsync();
    }
 
@@ -1737,13 +1743,26 @@
 
    private void postAcknowledge(final MessageReference ref)
    {
+      QueueImpl queue = (QueueImpl)ref.getQueue();
+
+      queue.deliveringCount.decrementAndGet();
+      
+      if (queue.deliveringCount.get() < 0)
+      {
+         new Exception("DeliveringCount became negative").printStackTrace();
+      }
+
+      if (ref.isPaged())
+      {
+         // nothing to be done
+         return;
+      }
+
       final ServerMessage message = ref.getMessage();
 
-      QueueImpl queue = (QueueImpl)ref.getQueue();
-
       boolean durableRef = message.isDurable() && queue.durable;
 
-      if (durableRef && ! ref.isPaged())
+      if (durableRef)
       {
          int count = message.decrementDurableRefCount();
 
@@ -1775,13 +1794,6 @@
          }
       }
 
-      queue.deliveringCount.decrementAndGet();
-      
-      if (queue.deliveringCount.get() < 0)
-      {
-         new Exception("DeliveringCount became negative").printStackTrace();
-      }
-
       try
       {
          message.decrementRefCount();
@@ -1827,10 +1839,19 @@
    private final class RefsOperation implements TransactionOperation
    {
       List<MessageReference> refsToAck = new ArrayList<MessageReference>();
+      List<ServerMessage> pagedMessagesToPostACK = null;
 
       synchronized void addAck(final MessageReference ref)
       {
          refsToAck.add(ref);
+         if (ref.isPaged())
+         {
+            if (pagedMessagesToPostACK == null)
+            {
+               pagedMessagesToPostACK = new ArrayList<ServerMessage>();
+            }
+            pagedMessagesToPostACK.add(ref.getMessage());
+         }
       }
 
       public void beforeCommit(final Transaction tx) throws Exception
@@ -1891,6 +1912,21 @@
                postAcknowledge(ref);
             }
          }
+         
+         if (pagedMessagesToPostACK != null)
+         {
+            for (ServerMessage msg : pagedMessagesToPostACK)
+            {
+               try
+               {
+                  msg.decrementRefCount();
+               }
+               catch (Exception e)
+               {
+                  log.warn(e.getMessage(), e);
+               }
+            }
+         }
       }
 
       public void beforePrepare(final Transaction tx) throws Exception

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-04 21:52:52 UTC (rev 10180)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-02-04 23:25:01 UTC (rev 10181)
@@ -112,11 +112,19 @@
       super.tearDown();
    }
 
+   public void testRepeat() throws Exception
+   {
+      for (int i = 0; i < 100; i++)
+      {
+         System.out.println(" ####################### test " + i);
+         testPreparePersistent();
+         tearDown();
+         setUp();
+      }
+   }
+
    public void testPreparePersistent() throws Exception
    {
-      boolean persistentMessages = true;
-
-      System.out.println("PageDir:" + getPageDir());
       clearData();
 
       Configuration config = createDefaultConfig();
@@ -133,8 +141,12 @@
 
       final int messageSize = 1024;
 
-      final int numberOfMessages = 10000;
+      final int numberOfMessages = 5000;
 
+      final int numberOfTX = 10;
+
+      final int messagesPerTX = numberOfMessages / numberOfTX;
+
       try
       {
          ServerLocator locator = createInVMNonHALocator();
@@ -164,7 +176,7 @@
 
          for (int i = 0; i < numberOfMessages; i++)
          {
-            message = session.createMessage(persistentMessages);
+            message = session.createMessage(true);
 
             HornetQBuffer bodyLocal = message.getBodyBuffer();
 
@@ -204,7 +216,7 @@
          LinkedList<Xid> xids = new LinkedList<Xid>();
 
          int msgReceived = 0;
-         for (int i = 0; i < numberOfMessages / 999; i++)
+         for (int i = 0; i < numberOfTX; i++)
          {
             ClientSession sessionConsumer = sf.createSession(true, false, false);
             Xid xid = newXID();
@@ -212,14 +224,14 @@
             sessionConsumer.start(xid, XAResource.TMNOFLAGS);
             sessionConsumer.start();
             ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
-            for (int msgCount = 0; msgCount < 1000; i++)
+            for (int msgCount = 0; msgCount < messagesPerTX; msgCount++)
             {
                if (msgReceived == numberOfMessages)
                {
                   break;
                }
-               System.out.println("MsgReceived = " + (msgReceived++));
-               ClientMessage msg = consumer.receive(5000);
+               msgReceived++;
+               ClientMessage msg = consumer.receive(10000);
                assertNotNull(msg);
                msg.acknowledge();
             }
@@ -236,8 +248,6 @@
 
          sessionCheck.close();
 
-         System.out.println(queue.getMessagesAdded());
-
          assertEquals(numberOfMessages, queue.getMessageCount());
 
          sf.close();
@@ -262,25 +272,75 @@
          consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
+         
+         
+         assertEquals(numberOfMessages, queue.getMessageCount());
 
-         assertNull(consumer.receiveImmediate());
+         ClientMessage msg = consumer.receive(5000);
+         if (msg != null)
+         {
+            System.out.println("Msg " + msg.getIntProperty("id"));
 
-         for (Xid xid : xids)
+            while (true)
+            {
+               ClientMessage msg2 = consumer.receive(1000);
+               if (msg2 == null)
+               {
+                  break;
+               }
+               System.out.println("Msg received again : " + msg2.getIntProperty("id"));
+
+            }
+         }
+         assertNull(msg);
+
+         for (int i = xids.size() -1 ; i >= 0; i--)
          {
+            Xid xid = xids.get(i);
             session.rollback(xid);
          }
+         System.out.println("msgCount = " + queue.getMessageCount());
 
          xids.clear();
 
-         assertNotNull(consumer.receiveImmediate());
+         session.close();
 
+         session = sf.createSession(false, false, false);
+
+         session.start();
+         
+         consumer = session.createConsumer(PagingTest.ADDRESS);
+         
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            msg = consumer.receive(1000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            
+            assertEquals(i, msg.getIntProperty("id").intValue());
+            
+            if (i % 500 == 0)
+            {
+               session.commit();
+            }
+         }
+         
+         session.commit();
+         
          session.close();
 
          sf.close();
 
          locator.close();
 
-         queue.getMessageCount();
+         assertEquals(0, queue.getMessageCount());
+         
+         long timeout = System.currentTimeMillis() + 5000;
+         while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+         {
+            Thread.sleep(100);
+         }
+         assertFalse (queue.getPageSubscription().getPagingStore().isPaging());
          // assertEquals(numberOfMessages, queue.getMessageCount());
       }
       finally
@@ -331,7 +391,10 @@
          ClientSession session = sf.createSession(false, false, false);
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("-invalid"), new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER), true);
+         session.createQueue(PagingTest.ADDRESS,
+                             PagingTest.ADDRESS.concat("-invalid"),
+                             new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER),
+                             true);
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
 
@@ -374,16 +437,16 @@
                session.commit();
             }
          }
-         
+
          session.commit();
-         
+
          session.commit();
 
          session.commit();
-         
+
          PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
          store.getCursorProvier().cleanup();
-         
+
          long timeout = System.currentTimeMillis() + 5000;
          while (store.isPaging() && timeout > System.currentTimeMillis())
          {
@@ -402,7 +465,7 @@
          try
          {
             server.stop();
-           // System.exit(-1);
+            // System.exit(-1);
          }
          catch (Throwable ignored)
          {
@@ -1792,6 +1855,10 @@
          for (int i = 0; i < numberOfMessages; i++)
          {
             System.out.println("Received " + i);
+            if (i == 55)
+            {
+               System.out.println("i = 55");
+            }
             ClientMessage msg = consumer.receive(5000);
             Assert.assertNotNull(msg);
             msg.acknowledge();



More information about the hornetq-commits mailing list