Author: clebert.suconic(a)jboss.com
Date: 2011-03-14 12:46:25 -0400 (Mon, 14 Mar 2011)
New Revision: 10325
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6091 - move message is not ignoring duplicates
(feature was only working with moveMessages)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-03-14
15:46:46 UTC (rev 10324)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-03-14
16:46:25 UTC (rev 10325)
@@ -575,7 +575,7 @@
throw new IllegalArgumentException("No queue found for " +
otherQueueName);
}
- return queue.moveReference(messageID, binding.getAddress());
+ return queue.moveReference(messageID, binding.getAddress(), rejectDuplicates);
}
finally
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-03-14
15:46:46 UTC (rev 10324)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-03-14
16:46:25 UTC (rev 10325)
@@ -826,7 +826,7 @@
{
if (expiryAddress != null)
{
- move(expiryAddress, ref, true);
+ move(expiryAddress, ref, true, false);
}
else
{
@@ -1101,7 +1101,7 @@
deliveringCount.incrementAndGet();
try
{
- move(toAddress, ref);
+ move(toAddress, ref, false, rejectDuplicate);
}
catch (Exception e)
{
@@ -1627,11 +1627,6 @@
return messageReferences.size();
}
- private void move(final SimpleString toAddress, final MessageReference ref) throws
Exception
- {
- move(toAddress, ref, false);
- }
-
private void move(final SimpleString toAddress,
final Transaction tx,
final MessageReference ref,
@@ -1711,7 +1706,7 @@
QueueImpl.log.warn("Message has reached maximum delivery attempts,
sending it to Dead Letter Address " + deadLetterAddress +
" from " +
name);
- move(deadLetterAddress, ref, false);
+ move(deadLetterAddress, ref, false, false);
}
}
else
@@ -1723,7 +1718,7 @@
}
}
- private void move(final SimpleString address, final MessageReference ref, final
boolean expiry) throws Exception
+ private void move(final SimpleString address, final MessageReference ref, final
boolean expiry, final boolean rejectDuplicate) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -1731,7 +1726,7 @@
copyMessage.setAddress(address);
- postOffice.route(copyMessage, tx, false);
+ postOffice.route(copyMessage, tx, false, rejectDuplicate);
acknowledge(tx, ref);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-03-14
15:46:46 UTC (rev 10324)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-03-14
16:46:25 UTC (rev 10325)
@@ -1345,6 +1345,92 @@
}
+ public void testMoveMessagesBack2() throws Exception
+ {
+ server.createQueue(new SimpleString("q1"), new
SimpleString("q1"), null, true, false);
+ server.createQueue(new SimpleString("q2"), new
SimpleString("q2"), null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod1 = session.createProducer("q1");
+
+ int NUMBER_OF_MSGS = 10;
+
+ for (int i = 0; i < NUMBER_OF_MSGS; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+
+ msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID,
new SimpleString("dupl-" + i));
+
+ prod1.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumer consumer = session.createConsumer("q1", true);
+ session.start();
+
+ assertNotNull(consumer.receive(5000));
+ consumer.close();
+
+ QueueControl q1Control = ManagementControlHelper.createQueueControl(new
SimpleString("q1"),
+ new
SimpleString("q1"),
+ mbeanServer);
+
+ QueueControl q2Control = ManagementControlHelper.createQueueControl(new
SimpleString("q2"),
+ new
SimpleString("q2"),
+ mbeanServer);
+
+ assertEquals(NUMBER_OF_MSGS, q1Control.moveMessages(null, "q2"));
+
+ long messageIDs[] = new long[NUMBER_OF_MSGS];
+
+ consumer = session.createConsumer("q2", true);
+
+ for (int i = 0 ; i < NUMBER_OF_MSGS; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ messageIDs[i] = msg.getMessageID();
+ }
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ for (int i = 0 ; i < NUMBER_OF_MSGS; i++)
+ {
+ q2Control.moveMessage(messageIDs[i], "q1");
+ }
+
+
+ session.start();
+ consumer = session.createConsumer("q1");
+
+ for (int i = 0; i < NUMBER_OF_MSGS; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ System.out.println("msg = " + msg);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ consumer.close();
+
+ session.deleteQueue("q1");
+
+ session.deleteQueue("q2");
+
+ session.close();
+
+ locator.close();
+
+ }
+
public void testPauseAndResume()
{
long counterPeriod = 1000;