[hornetq-commits] JBoss hornetq SVN: r12087 - in branches/Branch_2_2_AS7: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 6 21:46:06 EST 2012


Author: clebert.suconic at jboss.com
Date: 2012-02-06 21:46:06 -0500 (Mon, 06 Feb 2012)
New Revision: 12087

Modified:
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
HORNETQ-843 - fixing expiry on Paging

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java	2012-02-06 14:29:24 UTC (rev 12086)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/server/impl/QueueImpl.java	2012-02-07 02:46:06 UTC (rev 12087)
@@ -1223,8 +1223,10 @@
                try
                {
                   boolean expired = false;
+                  boolean hasElements = false;
                   while (iter.hasNext())
                   {
+                     hasElements = true;
                      MessageReference ref = iter.next();
                      try
                      {
@@ -1243,7 +1245,8 @@
                      }
                   }
                   
-                  if (expired && pageIterator != null && pageIterator.hasNext())
+                  // If empty we need to schedule depaging to make sure we would depage expired messages as well
+                  if ((!hasElements || expired && pageIterator != null) && pageIterator.hasNext())
                   {
                      scheduleDepage();
                   }

Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2012-02-06 14:29:24 UTC (rev 12086)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2012-02-07 02:46:06 UTC (rev 12087)
@@ -138,6 +138,7 @@
       backupConfig.setSharedStore(true);
       backupConfig.setBackup(true);
       backupConfig.setClustered(true);
+      backupConfig.setMessageExpiryScanPeriod(100);
       TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
       TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
       backupConfig.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
@@ -156,6 +157,7 @@
       liveConfig.setSecurityEnabled(false);
       liveConfig.setSharedStore(true);
       liveConfig.setClustered(true);
+      liveConfig.setMessageExpiryScanPeriod(100);
       List<String> pairs = null;
       ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
             pairs, false);

Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2012-02-06 14:29:24 UTC (rev 12086)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2012-02-07 02:46:06 UTC (rev 12087)
@@ -31,6 +31,7 @@
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
@@ -202,6 +203,68 @@
       }
    }
 
+   public void testExpireMessage() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setReconnectAttempts(-1);
+
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      ClientSession session = sf.createSession(true, true, 0);
+
+      try
+      {
+
+         session.createQueue(PagingFailoverTest.ADDRESS, PagingFailoverTest.ADDRESS, true);
+
+         ClientProducer prod = session.createProducer(PagingFailoverTest.ADDRESS);
+
+         final int TOTAL_MESSAGES = 1000;
+
+         for (int i = 0; i < TOTAL_MESSAGES; i++)
+         {
+            ClientMessage msg = session.createMessage(true);
+            msg.putIntProperty(new SimpleString("key"), i);
+            msg.setExpiration(System.currentTimeMillis() + 1000);
+            prod.send(msg);
+         }
+
+         crash(session);
+
+         session.close();
+         
+         Queue queue = backupServer.getServer().locateQueue(ADDRESS);
+         
+         long timeout = System.currentTimeMillis() + 60000;
+         System.out.println("Starting now");
+         while (timeout > System.currentTimeMillis() && queue.getPageSubscription().isPaging())
+         {
+            Thread.sleep(100);
+         }
+         
+         try
+         {
+            assertFalse(queue.getPageSubscription().isPaging());
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+            System.exit(-1);
+         }
+
+      }
+      finally
+      {
+         try
+         {
+            session.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+      }
+   }
+
    /**
     * @param session
     * @param latch



More information about the hornetq-commits mailing list