[hornetq-commits] JBoss hornetq SVN: r10325 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Mar 14 12:46:25 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-14 12:46:25 -0400 (Mon, 14 Mar 2011)
New Revision: 10325

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6091 - move message is not ignoring duplicates (feature was only working with moveMessages)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-03-14 15:46:46 UTC (rev 10324)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-03-14 16:46:25 UTC (rev 10325)
@@ -575,7 +575,7 @@
             throw new IllegalArgumentException("No queue found for " + otherQueueName);
          }
 
-         return queue.moveReference(messageID, binding.getAddress());
+         return queue.moveReference(messageID, binding.getAddress(), rejectDuplicates);
       }
       finally
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-03-14 15:46:46 UTC (rev 10324)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-03-14 16:46:25 UTC (rev 10325)
@@ -826,7 +826,7 @@
    {
       if (expiryAddress != null)
       {
-         move(expiryAddress, ref, true);
+         move(expiryAddress, ref, true, false);
       }
       else
       {
@@ -1101,7 +1101,7 @@
                deliveringCount.incrementAndGet();
                try
                {
-                  move(toAddress, ref);
+                  move(toAddress, ref, false, rejectDuplicate);
                }
                catch (Exception e)
                {
@@ -1627,11 +1627,6 @@
       return messageReferences.size();
    }
 
-   private void move(final SimpleString toAddress, final MessageReference ref) throws Exception
-   {
-      move(toAddress, ref, false);
-   }
-
    private void move(final SimpleString toAddress,
                      final Transaction tx,
                      final MessageReference ref,
@@ -1711,7 +1706,7 @@
             QueueImpl.log.warn("Message has reached maximum delivery attempts, sending it to Dead Letter Address " + deadLetterAddress +
                                " from " +
                                name);
-            move(deadLetterAddress, ref, false);
+            move(deadLetterAddress, ref, false, false);
          }
       }
       else
@@ -1723,7 +1718,7 @@
       }
    }
 
-   private void move(final SimpleString address, final MessageReference ref, final boolean expiry) throws Exception
+   private void move(final SimpleString address, final MessageReference ref, final boolean expiry, final boolean rejectDuplicate) throws Exception
    {
       Transaction tx = new TransactionImpl(storageManager);
 
@@ -1731,7 +1726,7 @@
 
       copyMessage.setAddress(address);
 
-      postOffice.route(copyMessage, tx, false);
+      postOffice.route(copyMessage, tx, false, rejectDuplicate);
 
       acknowledge(tx, ref);
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2011-03-14 15:46:46 UTC (rev 10324)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2011-03-14 16:46:25 UTC (rev 10325)
@@ -1345,6 +1345,92 @@
 
    }
 
+   public void testMoveMessagesBack2() throws Exception
+   {
+      server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false);
+      server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false);
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, true);
+
+      ClientProducer prod1 = session.createProducer("q1");
+      
+      int NUMBER_OF_MSGS = 10;
+
+      for (int i = 0; i < NUMBER_OF_MSGS; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+
+         msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
+
+         prod1.send(msg);
+      }
+
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer("q1", true);
+      session.start();
+
+      assertNotNull(consumer.receive(5000));
+      consumer.close();
+
+      QueueControl q1Control = ManagementControlHelper.createQueueControl(new SimpleString("q1"),
+                                                                          new SimpleString("q1"),
+                                                                          mbeanServer);
+
+      QueueControl q2Control = ManagementControlHelper.createQueueControl(new SimpleString("q2"),
+                                                                          new SimpleString("q2"),
+                                                                          mbeanServer);
+
+      assertEquals(NUMBER_OF_MSGS, q1Control.moveMessages(null, "q2"));
+
+      long messageIDs[] = new long[NUMBER_OF_MSGS];
+      
+      consumer = session.createConsumer("q2", true);
+      
+      for (int i = 0 ; i < NUMBER_OF_MSGS; i++)
+      {
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+         messageIDs[i] = msg.getMessageID();
+      }
+
+      assertNull(consumer.receiveImmediate());
+
+      consumer.close();
+      
+      for (int i = 0 ; i < NUMBER_OF_MSGS; i++)
+      {
+         q2Control.moveMessage(messageIDs[i], "q1");
+      }
+
+
+      session.start();
+      consumer = session.createConsumer("q1");
+
+      for (int i = 0; i < NUMBER_OF_MSGS; i++)
+      {
+         ClientMessage msg = consumer.receive(5000);
+         System.out.println("msg = " + msg);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+
+      consumer.close();
+
+      session.deleteQueue("q1");
+
+      session.deleteQueue("q2");
+
+      session.close();
+
+      locator.close();
+
+   }
+
    public void testPauseAndResume()
    {
       long counterPeriod = 1000;



More information about the hornetq-commits mailing list