[hornetq-commits] JBoss hornetq SVN: r10463 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Apr 7 18:04:35 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-07 18:04:34 -0400 (Thu, 07 Apr 2011)
New Revision: 10463

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6273 - Scheduled Messages Issue

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-07 18:52:30 UTC (rev 10462)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-04-07 22:04:34 UTC (rev 10463)
@@ -1144,10 +1144,18 @@
          }
 
          Collection<AddMessageRecord> valueRecords = queueRecords.values();
+         
+         long currentTime = System.currentTimeMillis();
 
          for (AddMessageRecord record : valueRecords)
          {
             long scheduledDeliveryTime = record.scheduledDeliveryTime;
+            
+            if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <= currentTime)
+            {
+               scheduledDeliveryTime = 0;
+               record.message.removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+            }
 
             if (scheduledDeliveryTime != 0)
             {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-04-07 18:52:30 UTC (rev 10462)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-04-07 22:04:34 UTC (rev 10463)
@@ -13,7 +13,18 @@
 
 package org.hornetq.tests.integration.cluster.distribution;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.MessageFlowRecord;
 import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
@@ -333,6 +344,118 @@
       verifyReceiveAll(20, 1);
    }
 
+   public void testRedistributeWithScheduling() throws Exception
+   {
+      setupCluster(false);
+
+      AddressSettings setting = new AddressSettings();
+      setting.setRedeliveryDelay(10000);
+      servers[0].getAddressSettingsRepository().addMatch("queues.testaddress", setting);
+      servers[0].getAddressSettingsRepository().addMatch("queue0", setting);
+      servers[1].getAddressSettingsRepository().addMatch("queue0", setting);
+      servers[1].getAddressSettingsRepository().addMatch("queues.testaddress", setting);
+      
+      startServers(0);
+      
+      setupSessionFactory(0, isNetty());
+      
+      createQueue(0, "queues.testaddress", "queue0", null, false);
+      
+      ClientSession session0 = sfs[0].createSession(false, false, false);
+      
+      ClientProducer prod0 = session0.createProducer("queues.testaddress");
+      
+      for (int i = 0 ; i < 100; i++)
+      {
+         ClientMessage msg = session0.createMessage(true);
+         msg.putIntProperty("key", i);
+         
+         byte[] bytes = new byte[24];
+         
+         ByteBuffer bb = ByteBuffer.wrap(bytes);
+         
+         bb.putLong((long)i);
+         
+         msg.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
+
+         prod0.send(msg);
+         
+         session0.commit();
+      }
+      
+      session0.close();
+      
+      session0 = sfs[0].createSession(true, false, false);
+
+      ClientConsumer consumer0 = session0.createConsumer("queue0");
+      
+      session0.start();
+      
+      ArrayList<Xid> xids = new ArrayList<Xid>();
+      
+      for (int i = 0 ; i < 100; i++)
+      {
+         Xid xid = newXID();
+         
+         session0.start(xid, XAResource.TMNOFLAGS);
+         
+         ClientMessage msg = consumer0.receive(5000);
+         
+         msg.acknowledge();
+         
+         session0.end(xid, XAResource.TMSUCCESS);
+         
+         session0.prepare(xid);
+         
+         xids.add(xid);
+      }
+      
+      session0.close();
+      
+      sfs[0].close();
+      sfs[0] = null;
+      
+      
+      startServers(0, 1, 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      createQueue(1, "queues.testaddress", "queue0", null, false);
+      createQueue(2, "queues.testaddress", "queue0", null, false);
+      
+      ClientSession session1 = sfs[1].createSession(false, false);
+      session1.start();
+      ClientConsumer consumer1 = session1.createConsumer("queue0");
+      
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+      waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 1, false);
+      waitForBindings(1, "queues.testaddress", 2, 0, false);
+      waitForBindings(2, "queues.testaddress", 2, 1, false);
+      
+      session0 = sfs[0].createSession(true, false, false);
+      
+      for (Xid xid: xids)
+      {
+         session0.rollback(xid);
+      }
+      
+      
+      for (int i = 0 ; i < 100; i++)
+      {
+         ClientMessage msg = consumer1.receive(15000);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+      
+      session1.commit();
+      
+   }
+
    public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws Exception
    {
       setupCluster(false);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java	2011-04-07 18:52:30 UTC (rev 10462)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java	2011-04-07 22:04:34 UTC (rev 10463)
@@ -12,6 +12,9 @@
  */
 package org.hornetq.tests.integration.scheduling;
 
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
@@ -32,6 +35,7 @@
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic">Clebert Suconic</a>
  */
 public class ScheduledMessageTest extends ServiceTestBase
 {
@@ -52,6 +56,14 @@
    {
       super.setUp();
       clearData();
+      startServer();
+   }
+
+   /**
+    * @throws Exception
+    */
+   protected void startServer() throws Exception
+   {
       configuration = createDefaultConfig();
       configuration.setSecurityEnabled(false);
       configuration.setJournalMinFiles(2);
@@ -64,7 +76,7 @@
    protected void tearDown() throws Exception
    {
       locator.close();
-      
+
       if (server != null)
       {
          try
@@ -282,10 +294,10 @@
       session.createQueue(atestq, atestq, null, true);
       ClientProducer producer = session.createProducer(atestq);
       ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                          false,
-                                                          0,
-                                                          System.currentTimeMillis(),
-                                                          (byte)1);
+                                                    false,
+                                                    0,
+                                                    System.currentTimeMillis(),
+                                                    (byte)1);
       message.getBodyBuffer().writeString("testINVMCoreClient");
       message.setDurable(true);
       long time = System.currentTimeMillis();
@@ -484,6 +496,47 @@
       session.close();
    }
 
+
+   public void testManyMessagesSameTime() throws Exception
+   {
+
+      ClientSessionFactory sessionFactory = locator.createSessionFactory();
+      ClientSession session = sessionFactory.createSession(false, false, false);
+      session.createQueue(atestq, atestq, null, true);
+      ClientProducer producer = session.createProducer(atestq);
+      long time = System.currentTimeMillis();
+      time += 1000;
+      
+      for (int i = 0; i < 1000; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("value", i);
+         message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+         producer.send(message);
+      }
+      
+      session.commit();
+      
+      
+      session.start();
+      ClientConsumer consumer = session.createConsumer(atestq);
+      
+      for (int i = 0 ; i < 1000; i++)
+      {
+         ClientMessage message = consumer.receive(15000);
+         assertNotNull(message);
+         message.acknowledge();
+         
+         assertEquals(i, message.getIntProperty("value").intValue());
+      }
+      
+      session.commit();
+
+      Assert.assertNull(consumer.receiveImmediate());
+
+      session.close();
+   }
+
    public void testScheduledAndNormalMessagesDeliveredCorrectly(final boolean recover) throws Exception
    {
 
@@ -607,7 +660,76 @@
       Assert.assertNull(consumer.receiveImmediate());
       session.close();
    }
+   
+   
+   public void testPendingACKOnPrepared() throws Exception
+   {
+      
+      int NUMBER_OF_MESSAGES = 100;
+      
+      ClientSessionFactory sessionFactory = locator.createSessionFactory();
+      ClientSession session = sessionFactory.createSession(true, false, false);
+      session.createQueue(atestq, atestq, null, true);
 
+      ClientProducer producer = session.createProducer(atestq);
+      
+      long scheduled = System.currentTimeMillis() + 1000;
+      for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+         msg.putIntProperty("value", i);
+         msg.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, scheduled);
+         producer.send(msg);
+      }
+      
+      session.close();
+      
+      
+      for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
+      {
+         Xid xid = newXID();
+
+         session = sessionFactory.createSession(true, false, false);
+         
+         ClientConsumer consumer = session.createConsumer(atestq);
+         
+         session.start();
+
+         session.start(xid, XAResource.TMNOFLAGS);
+         
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         session.end(xid, XAResource.TMSUCCESS);
+         
+         session.prepare(xid);
+         
+         session.close();
+      }
+      
+      sessionFactory.close();
+      locator.close();
+      
+      server.stop();
+      
+      startServer();
+      
+      sessionFactory = locator.createSessionFactory();
+      
+      session = sessionFactory.createSession(false, false);
+      
+      ClientConsumer consumer = session.createConsumer(atestq);
+      
+      session.start();
+      
+      assertNull(consumer.receive(1000));
+      
+      session.close();
+      
+      sessionFactory.close();
+
+   }
+
    public void testScheduledDeliveryTX() throws Exception
    {
       scheduledDelivery(true);
@@ -618,6 +740,116 @@
       scheduledDelivery(false);
    }
 
+   public void testRedeliveryAfterPrepare() throws Exception
+   {
+      AddressSettings qs = new AddressSettings();
+      qs.setRedeliveryDelay(5000l);
+      server.getAddressSettingsRepository().addMatch(atestq.toString(), qs);
+
+      ClientSessionFactory sessionFactory = locator.createSessionFactory();
+      ClientSession session = sessionFactory.createSession(false, false, false);
+
+      session.createQueue(atestq, atestq, true);
+
+      ClientProducer producer = session.createProducer(atestq);
+      for (int i = 0; i < 100; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+         msg.putIntProperty("key", i);
+         producer.send(msg);
+         session.commit();
+      }
+
+      session.close();
+
+      session = sessionFactory.createSession(true, false, false);
+
+      ClientConsumer consumer = session.createConsumer(atestq);
+
+      ArrayList<Xid> xids = new ArrayList<Xid>();
+
+      session.start();
+
+      for (int i = 0; i < 100; i++)
+      {
+         Xid xid = newXID();
+         session.start(xid, XAResource.TMNOFLAGS);
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         session.end(xid, XAResource.TMSUCCESS);
+         session.prepare(xid);
+         xids.add(xid);
+      }
+
+      session.rollback(xids.get(0));
+      xids.set(0, null);
+      session.close();
+
+      server.stop();
+
+      configuration = createDefaultConfig();
+      configuration.setSecurityEnabled(false);
+      configuration.setJournalMinFiles(2);
+      configuration.getAddressesSettings().put(atestq.toString(), qs);
+
+      server = createServer(true, configuration);
+      server.start();
+
+      locator = createInVMNonHALocator();
+
+      final AtomicInteger count = new AtomicInteger(0);
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               ClientSessionFactory sf = locator.createSessionFactory();
+               ClientSession session = sf.createSession(false, false);
+               session.start();
+               ClientConsumer cons = session.createConsumer(atestq);
+               for (int i = 0; i < 100; i++)
+               {
+                  ClientMessage msg = cons.receive(100000);
+                  assertNotNull(msg);
+                  System.out.println("Received message " + msg);
+                  count.incrementAndGet();
+                  msg.acknowledge();
+                  session.commit();
+               }
+               session.close();
+               sf.close();
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               count.set(-1);
+            }
+         }
+      };
+
+      t.start();
+
+      sessionFactory = locator.createSessionFactory();
+
+      session = sessionFactory.createSession(true, false, false);
+
+      for (Xid xid : xids)
+      {
+         if (xid != null)
+         {
+            session.rollback(xid);
+         }
+      }
+
+      session.close();
+
+      t.join();
+
+      assertEquals(100, count.get());
+   }
+
    // Private -------------------------------------------------------
 
    private void scheduledDelivery(final boolean tx) throws Exception
@@ -770,10 +1002,10 @@
    private ClientMessage createDurableMessage(final ClientSession session, final String body)
    {
       ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                          true,
-                                                          0,
-                                                          System.currentTimeMillis(),
-                                                          (byte)1);
+                                                    true,
+                                                    0,
+                                                    System.currentTimeMillis(),
+                                                    (byte)1);
       message.getBodyBuffer().writeString(body);
       return message;
    }



More information about the hornetq-commits mailing list