[hornetq-commits] JBoss hornetq SVN: r10675 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue May 17 00:37:19 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-17 00:37:18 -0400 (Tue, 17 May 2011)
New Revision: 10675

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Paging optimizations and test fixes

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-17 03:39:56 UTC (rev 10674)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-17 04:37:18 UTC (rev 10675)
@@ -121,6 +121,8 @@
 
    private final Runnable deliverRunner = new DeliverRunner();
 
+   private volatile boolean depagePending = false;
+   
    private final Runnable depageRunner = new DepageRunner();
 
    private final StorageManager storageManager;
@@ -837,10 +839,18 @@
    {
       if (expiryAddress != null)
       {
+         if (isTrace)
+         {
+            log.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName(), new Exception ("trace"));
+         }
          move(expiryAddress, ref, true, false);
       }
       else
       {
+         if (isTrace)
+         {
+            log.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
+         }
          acknowledge(ref);
       }
    }
@@ -1605,12 +1615,20 @@
 
    private void scheduleDepage()
    {
-      pageSubscription.getExecutor().execute(depageRunner);
+      if (!depagePending)
+      {
+         if (isTrace)
+         {
+            log.trace("Scheduling depage for queue " + this.getName());
+         }
+         depagePending = true;
+         pageSubscription.getExecutor().execute(depageRunner);
+      }
    }
 
    private void depage()
    {
-      if (paused || pageIterator == null || consumerList.isEmpty())
+      if (paused || pageIterator == null)
       {
          return;
       }
@@ -1618,13 +1636,15 @@
       long maxSize = pageSubscription.getPagingStore().getPageSizeBytes();
 
       // System.out.println("QueueMemorySize before depage = " + queueMemorySize.get());
+      int depaged = 0;
       while (queueMemorySize.get() < maxSize && pageIterator.hasNext())
       {
+         depaged++;
          PagedReference reference = pageIterator.next();
          addTail(reference, false);
          pageIterator.remove();
       }
-      // System.out.println("QueueMemorySize after depage = " + queueMemorySize.get() + " depaged " + nmessages);
+      log.debug("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
 
       deliverAsync();
    }
@@ -1898,6 +1918,10 @@
    {
       if (reference.getMessage().isExpired())
       {
+         if (isTrace)
+         {
+            log.trace("Reference " + reference + " is expired");
+         }
          reference.handled();
 
          try
@@ -2191,6 +2215,7 @@
       {
          try
          {
+            depagePending = false;
             depage();
          }
          catch (Exception e)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-17 03:39:56 UTC (rev 10674)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-17 04:37:18 UTC (rev 10675)
@@ -3844,13 +3844,12 @@
          
          session.start();
          
-         ClientConsumer cons = session.createConsumer(ADDRESS);
+         ClientConsumer consAddr = session.createConsumer(ADDRESS);
          
-         assertNull(cons.receive(1000));
+         assertNull(consAddr.receive(1000));
          
-         cons.close();
          
-         cons = session.createConsumer("DLA");
+         ClientConsumer cons = session.createConsumer("DLA");
 
          for (int i = 0; i < 500; i++)
          {



More information about the hornetq-commits mailing list