[hornetq-commits] JBoss hornetq SVN: r10131 - in branches/Branch_2_2_EAP: src/main/org/hornetq/api/jms/management and 10 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Jan 21 17:44:34 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-01-21 17:44:33 -0500 (Fri, 21 Jan 2011)
New Revision: 10131
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
JBPAPP-5800 / HORNETQ-605 - moving messages while ignoring duplicateIDs
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -185,22 +185,46 @@
*
* @return {@code true} if the message was moved, {@code false} else
*/
- @Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = MBeanOperationInfo.ACTION)
+ @Operation(desc = "Move the message corresponding to the given messageID to another queue. rejectDuplicate=false on this case", impact = MBeanOperationInfo.ACTION)
boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName) throws Exception;
/**
+ * Moves the message corresponding to the specified message ID to the specified other queue.
+ *
+ * @return {@code true} if the message was moved, {@code false} else
+ */
+ @Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = MBeanOperationInfo.ACTION)
+ boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
+ @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName,
+ @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
+
+ /**
* Moves all the message corresponding to the specified filter to the specified other queue.
+ * RejectDuplicates = false on this case
* <br>
* Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
*
* @return the number of moved messages
*/
- @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+ @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages). RejectDuplicates=false on this case.", impact = MBeanOperationInfo.ACTION)
int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName) throws Exception;
+
/**
+ * Moves all the message corresponding to the specified filter to the specified other queue.
+ * <br>
+ * Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
+ *
+ * @return the number of moved messages
+ */
+ @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+ int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
+ @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
+ @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
+
+ /**
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
*
* @return {@code true} if the message was sent to the dead letter address, {@code false} else
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -179,7 +179,16 @@
@Operation(desc = "Change the priority of the messages corresponding to the given filter", impact = MBeanOperationInfo.ACTION)
int changeMessagesPriority(@Parameter(name = "filter", desc = "A message filter") String filter,
@Parameter(name = "newPriority", desc = "the new priority (between 0 and 9)") int newPriority) throws Exception;
+ /**
+ * Moves the message corresponding to the specified message ID to the specified other queue.
+ *
+ * @return {@code true} if the message was moved, {@code false} else
+ */
+ @Operation(desc = "Move the message corresponding to the given messageID to another queue, ignoring duplicates (rejectDuplicates=false on this case)", impact = MBeanOperationInfo.ACTION)
+ boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID,
+ @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName) throws Exception;
+
/**
* Moves the message corresponding to the specified message ID to the specified other queue.
*
@@ -187,20 +196,34 @@
*/
@Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = MBeanOperationInfo.ACTION)
boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID,
- @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName) throws Exception;
+ @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName,
+ @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
/**
* Moves all the message corresponding to the specified filter to the specified other queue.
+ * RejectDuplicates=false on this case
* <br>
* Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
*
* @return the number of moved messages
*/
- @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+ @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages). rejectDuplicates=false on this case", impact = MBeanOperationInfo.ACTION)
int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName) throws Exception;
/**
+ * Moves all the message corresponding to the specified filter to the specified other queue.
+ * <br>
+ * Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
+ *
+ * @return the number of moved messages
+ */
+ @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+ int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
+ @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
+ @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
+
+ /**
* Lists the message counter for this queue.
*/
@Operation(desc = "List the message counters", impact = MBeanOperationInfo.INFO)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -543,6 +543,11 @@
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
{
+ return moveMessage(messageID, otherQueueName, false);
+ }
+
+ public boolean moveMessage(final long messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+ {
checkStarted();
clearIO();
@@ -566,6 +571,12 @@
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
{
+ return moveMessages(filterStr, otherQueueName, false);
+ }
+
+
+ public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+ {
checkStarted();
clearIO();
@@ -580,7 +591,7 @@
throw new IllegalArgumentException("No queue found for " + otherQueueName);
}
- int retValue = queue.moveReferences(filter, binding.getAddress());
+ int retValue = queue.moveReferences(filter, binding.getAddress(), rejectDuplicates);
return retValue;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -53,8 +53,12 @@
void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
+ void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
+
void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception;
+ void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
+
MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
boolean redistribute(ServerMessage message, final Queue originatingQueue, Transaction tx) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -541,8 +541,18 @@
route(message, new RoutingContextImpl(tx), direct);
}
+ public void route(final ServerMessage message, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception
+ {
+ route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
+ }
+
public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
{
+ route(message, context, direct, true);
+ }
+
+ public void route(final ServerMessage message, final RoutingContext context, final boolean direct, final boolean rejectDuplicates) throws Exception
+ {
// Sanity check
if (message.getRefCount() > 0)
{
@@ -557,11 +567,15 @@
DuplicateIDCache cache = null;
+ boolean isDuplicate = false;
+
if (duplicateIDBytes != null)
{
cache = getDuplicateIDCache(message.getAddress());
+
+ isDuplicate = cache.contains(duplicateIDBytes);
- if (cache.contains(duplicateIDBytes))
+ if (rejectDuplicates && isDuplicate)
{
if (context.getTransaction() == null)
{
@@ -580,7 +594,7 @@
boolean startedTx = false;
- if (cache != null)
+ if (cache != null && !isDuplicate)
{
if (context.getTransaction() == null)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -115,8 +115,12 @@
boolean moveReference(long messageID, SimpleString toAddress) throws Exception;
+ boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception;
+
int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
+ int moveReferences(Filter filter, SimpleString toAddress, boolean rejectDuplicates) throws Exception;
+
void addRedistributor(long delay);
void cancelRedistributor() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -974,8 +974,13 @@
return count;
}
- public synchronized boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
+ public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
{
+ return moveReference(messageID, toAddress, false);
+ }
+
+ public synchronized boolean moveReference(final long messageID, final SimpleString toAddress, final boolean rejectDuplicate) throws Exception
+ {
Iterator<MessageReference> iter = iterator();
while (iter.hasNext())
@@ -1000,8 +1005,13 @@
return false;
}
- public synchronized int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
+ public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
{
+ return moveReferences(filter, toAddress, false);
+ }
+
+ public synchronized int moveReferences(final Filter filter, final SimpleString toAddress, final boolean rejectDuplicates) throws Exception
+ {
Transaction tx = new TransactionImpl(storageManager);
int count = 0;
@@ -1022,19 +1032,23 @@
deliveringCount.incrementAndGet();
count++;
- byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
- if (duplicateBytes != null)
+ if (rejectDuplicates)
{
- if (targetDuplicateCache.contains(duplicateBytes))
+ byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+ if (duplicateBytes != null)
{
- log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
- acknowledge(tx, ref);
- ignored = true;
+ if (targetDuplicateCache.contains(duplicateBytes))
+ {
+ log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
+ acknowledge(tx, ref);
+ ignored = true;
+ }
}
}
+
if (!ignored)
{
- move(toAddress, tx, ref, false);
+ move(toAddress, tx, ref, false, rejectDuplicates);
}
iter.remove();
}
@@ -1055,7 +1069,7 @@
deliveringCount.incrementAndGet();
count++;
- move(toAddress, tx, ref, false);
+ move(toAddress, tx, ref, false, rejectDuplicates);
acknowledge(tx, ref);
}
@@ -1418,13 +1432,14 @@
private void move(final SimpleString toAddress,
final Transaction tx,
final MessageReference ref,
- final boolean expiry) throws Exception
+ final boolean expiry,
+ final boolean rejectDuplicate) throws Exception
{
ServerMessage copyMessage = makeCopy(ref, expiry);
copyMessage.setAddress(toAddress);
- postOffice.route(copyMessage, tx, false);
+ postOffice.route(copyMessage, tx, false, rejectDuplicate);
acknowledge(tx, ref);
}
@@ -1463,7 +1478,7 @@
}
else
{
- move(expiryAddress, tx, ref, true);
+ move(expiryAddress, tx, ref, true, true);
}
}
else
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -284,9 +284,14 @@
public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception
{
+ return moveMessage(messageID, otherQueueName, false);
+ }
+
+ public boolean moveMessage(final String messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+ {
String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
- int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress());
+ int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
if (moved != 1)
{
throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
@@ -295,13 +300,19 @@
return true;
}
- public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
+ public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
{
String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
- return coreQueueControl.moveMessages(filter, otherQueue.getAddress());
+ return coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
}
+
+ public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
+ {
+ return moveMessages(filterStr, otherQueueName, false);
+ }
+
@Operation(desc = "List all the existent consumers on the Queue")
public String listConsumersAsJSON() throws Exception
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -762,6 +762,7 @@
serverManager.destroyQueue(otherQueueName);
}
+
public void testMoveMessagesWithDuplicateIDSet() throws Exception
{
String otherQueueName = RandomUtil.randomString();
@@ -799,7 +800,7 @@
Assert.assertEquals(10, queueControl.getMessageCount());
- int moved = queueControl.moveMessages(null, otherQueueName);
+ int moved = queueControl.moveMessages(null, otherQueueName, true);
assertEquals(10, moved);
@@ -886,7 +887,7 @@
for (int i = 0 ; i < 10; i++)
{
- queueControl.moveMessage(ids[i], otherQueueName);
+ queueControl.moveMessage(ids[i], otherQueueName, true);
}
assertEquals(0, queueControl.getDeliveringCount());
@@ -955,7 +956,7 @@
Assert.assertEquals(1, queueControl.getMessageCount());
Assert.assertEquals(1, otherQueueControl.getMessageCount());
- int moved = queueControl.moveMessages(null, otherQueueName);
+ int moved = queueControl.moveMessages(null, otherQueueName, true);
assertEquals(1, moved);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -202,6 +202,16 @@
return (String)proxy.invokeOperation("listMessagesAsJSON", filter);
}
+ public boolean moveMessage(String messageID, String otherQueueName, boolean rejectDuplicates) throws Exception
+ {
+ return (Boolean)proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates);
+ }
+
+ public int moveMessages(String filter, String otherQueueName, boolean rejectDuplicates) throws Exception
+ {
+ return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName, rejectDuplicates);
+ }
+
public int moveMessages(final String filter, final String otherQueueName) throws Exception
{
return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName);
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -20,23 +20,26 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.DayCounterInfo;
import org.hornetq.api.core.management.HornetQServerControl;
import org.hornetq.api.core.management.MessageCounterInfo;
import org.hornetq.api.core.management.QueueControl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
/**
* A QueueControlTest
@@ -212,9 +215,8 @@
ClientConsumer consumer = session.createConsumer(queue);
Assert.assertEquals(1, queueControl.getConsumerCount());
-
System.out.println("Consumers: " + queueControl.listConsumersAsJSON());
-
+
JSONArray obj = new JSONArray(queueControl.listConsumersAsJSON());
assertEquals(1, obj.length());
@@ -1272,6 +1274,77 @@
}
+ public void testMoveMessagesBack() throws Exception
+ {
+ server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false);
+ server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false);
+
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(true, true);
+
+ ClientProducer prod1 = session.createProducer("q1");
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+
+ msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
+
+ prod1.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumer consumer = session.createConsumer("q1", true);
+ session.start();
+
+ assertNotNull(consumer.receive(5000));
+ consumer.close();
+
+ QueueControl q1Control = ManagementControlHelper.createQueueControl(new SimpleString("q1"),
+ new SimpleString("q1"),
+ mbeanServer);
+
+ QueueControl q2Control = ManagementControlHelper.createQueueControl(new SimpleString("q2"),
+ new SimpleString("q2"),
+ mbeanServer);
+
+ assertEquals(10, q1Control.moveMessages(null, "q2"));
+
+ consumer = session.createConsumer("q2", true);
+
+ assertNotNull(consumer.receive(500));
+
+ consumer.close();
+
+ q2Control.moveMessages(null, "q1", false);
+
+ session.start();
+ consumer = session.createConsumer("q1");
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ System.out.println("msg = " + msg);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ consumer.close();
+
+ session.deleteQueue("q1");
+
+ session.deleteQueue("q2");
+
+ session.close();
+
+ locator.close();
+
+ }
+
public void testPauseAndResume()
{
long counterPeriod = 1000;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -206,11 +206,21 @@
return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName);
}
+ public int moveMessages(final String filter, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+ {
+ return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName, rejectDuplicates);
+ }
+
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
{
return (Boolean)proxy.invokeOperation("moveMessage", messageID, otherQueueName);
}
+ public boolean moveMessage(final long messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+ {
+ return (Boolean)proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates);
+ }
+
public int removeMessages(final String filter) throws Exception
{
return (Integer)proxy.invokeOperation("removeMessages", filter);
@@ -265,7 +275,6 @@
{
return (String)proxy.invokeOperation("listConsumersAsJSON");
}
-
};
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -621,4 +621,22 @@
this.subs = sub;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#moveReference(long, org.hornetq.api.core.SimpleString, boolean)
+ */
+ public boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#moveReferences(org.hornetq.core.filter.Filter, org.hornetq.api.core.SimpleString, boolean)
+ */
+ public int moveReferences(Filter filter, SimpleString toAddress, boolean rejectDuplicates) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2011-01-21 22:44:33 UTC (rev 10131)
@@ -198,4 +198,22 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext, boolean, boolean)
+ */
+ public void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction, boolean, boolean)
+ */
+ public void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
More information about the hornetq-commits
mailing list