[hornetq-commits] JBoss hornetq SVN: r10131 - in branches/Branch_2_2_EAP: src/main/org/hornetq/api/jms/management and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 21 17:44:34 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-01-21 17:44:33 -0500 (Fri, 21 Jan 2011)
New Revision: 10131

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
JBPAPP-5800 / HORNETQ-605 - moving messages while ignoring duplicateIDs

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/core/management/QueueControl.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -185,22 +185,46 @@
     *
     * @return {@code true} if the message was moved, {@code false} else
     */
-   @Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = MBeanOperationInfo.ACTION)
+   @Operation(desc = "Move the message corresponding to the given messageID to another queue. rejectDuplicate=false on this case", impact = MBeanOperationInfo.ACTION)
    boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
                        @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName) throws Exception;
 
    /**
+    * Moves the message corresponding to the specified message ID to the specified other queue.
+    *
+    * @return {@code true} if the message was moved, {@code false} else
+    */
+   @Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = MBeanOperationInfo.ACTION)
+   boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
+                       @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName,
+                       @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
+
+   /**
     * Moves all the message corresponding to the specified filter  to the specified other queue.
+    * RejectDuplicates = false on this case
     * <br>
     * Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
     * 
     * @return the number of moved messages
     */
-   @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+   @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages). RejectDuplicates=false on this case.", impact = MBeanOperationInfo.ACTION)
    int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
                     @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName) throws Exception;
 
+
    /**
+    * Moves all the message corresponding to the specified filter  to the specified other queue.
+    * <br>
+    * Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
+    * 
+    * @return the number of moved messages
+    */
+   @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+   int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
+                    @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
+                    @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
+
+   /**
     * Sends the message corresponding to the specified message ID to this queue's dead letter address.
     *
     * @return {@code true} if the message was sent to the dead letter address, {@code false} else

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/api/jms/management/JMSQueueControl.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -179,7 +179,16 @@
    @Operation(desc = "Change the priority of the messages corresponding to the given filter", impact = MBeanOperationInfo.ACTION)
    int changeMessagesPriority(@Parameter(name = "filter", desc = "A message filter") String filter,
                               @Parameter(name = "newPriority", desc = "the new priority (between 0 and 9)") int newPriority) throws Exception;
+   /**
+    * Moves the message corresponding to the specified message ID to the specified other queue.
+    *
+    * @return {@code true} if the message was moved, {@code false} else
+    */
+   @Operation(desc = "Move the message corresponding to the given messageID to another queue, ignoring duplicates (rejectDuplicates=false on this case)", impact = MBeanOperationInfo.ACTION)
+   boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID,
+                       @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName) throws Exception;
 
+
    /**
     * Moves the message corresponding to the specified message ID to the specified other queue.
     *
@@ -187,20 +196,34 @@
     */
    @Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = MBeanOperationInfo.ACTION)
    boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID,
-                       @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName) throws Exception;
+                       @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName,
+                       @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
 
    /**
     * Moves all the message corresponding to the specified filter  to the specified other queue.
+    * RejectDuplicates=false on this case
     * <br>
     * Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
     * 
     * @return the number of moved messages
     */
-   @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+   @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages). rejectDuplicates=false on this case", impact = MBeanOperationInfo.ACTION)
    int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
                     @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName) throws Exception;
 
    /**
+    * Moves all the message corresponding to the specified filter  to the specified other queue.
+    * <br>
+    * Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
+    * 
+    * @return the number of moved messages
+    */
+   @Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
+   int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
+                    @Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
+                    @Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
+
+   /**
     * Lists the message counter for this queue.
     */
    @Operation(desc = "List the message counters", impact = MBeanOperationInfo.INFO)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -543,6 +543,11 @@
 
    public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
    {
+      return moveMessage(messageID, otherQueueName, false);
+   }
+
+   public boolean moveMessage(final long messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+   {
       checkStarted();
 
       clearIO();
@@ -566,6 +571,12 @@
 
    public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
    {
+      return moveMessages(filterStr, otherQueueName, false);
+   }
+
+
+   public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+   {
       checkStarted();
 
       clearIO();
@@ -580,7 +591,7 @@
             throw new IllegalArgumentException("No queue found for " + otherQueueName);
          }
 
-         int retValue = queue.moveReferences(filter, binding.getAddress());
+         int retValue = queue.moveReferences(filter, binding.getAddress(), rejectDuplicates);
 
          return retValue;
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/PostOffice.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -53,8 +53,12 @@
 
    void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
 
+   void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
+
    void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception;
 
+   void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
+
    MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
 
    boolean redistribute(ServerMessage message, final Queue originatingQueue, Transaction tx) throws Exception;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -541,8 +541,18 @@
       route(message, new RoutingContextImpl(tx), direct);
    }
 
+   public void route(final ServerMessage message, final Transaction tx, final boolean direct, final boolean rejectDuplicates) throws Exception
+   {
+      route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
+   }
+
    public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
    {
+      route(message, context, direct, true);
+   }
+
+   public void route(final ServerMessage message, final RoutingContext context, final boolean direct, final boolean rejectDuplicates) throws Exception
+   {
       // Sanity check
       if (message.getRefCount() > 0)
       {
@@ -557,11 +567,15 @@
 
       DuplicateIDCache cache = null;
       
+      boolean isDuplicate = false;
+      
       if (duplicateIDBytes != null)
       {
          cache = getDuplicateIDCache(message.getAddress());
+         
+         isDuplicate = cache.contains(duplicateIDBytes);
 
-         if (cache.contains(duplicateIDBytes))
+         if (rejectDuplicates && isDuplicate)
          {
             if (context.getTransaction() == null)
             {
@@ -580,7 +594,7 @@
 
       boolean startedTx = false;
 
-      if (cache != null)
+      if (cache != null && !isDuplicate)
       {
          if (context.getTransaction() == null)
          {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -115,8 +115,12 @@
 
    boolean moveReference(long messageID, SimpleString toAddress) throws Exception;
 
+   boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception;
+
    int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
 
+   int moveReferences(Filter filter, SimpleString toAddress, boolean rejectDuplicates) throws Exception;
+
    void addRedistributor(long delay);
 
    void cancelRedistributor() throws Exception;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -974,8 +974,13 @@
       return count;
    }
 
-   public synchronized boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
+   public boolean moveReference(final long messageID, final SimpleString toAddress) throws Exception
    {
+      return moveReference(messageID, toAddress, false);
+   }
+   
+   public synchronized boolean moveReference(final long messageID, final SimpleString toAddress, final boolean rejectDuplicate) throws Exception
+   {
       Iterator<MessageReference> iter = iterator();
 
       while (iter.hasNext())
@@ -1000,8 +1005,13 @@
       return false;
    }
 
-   public synchronized int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
+   public int moveReferences(final Filter filter, final SimpleString toAddress) throws Exception
    {
+      return moveReferences(filter, toAddress, false);
+   }
+
+   public synchronized int moveReferences(final Filter filter, final SimpleString toAddress, final boolean rejectDuplicates) throws Exception
+   {
       Transaction tx = new TransactionImpl(storageManager);
 
       int count = 0;
@@ -1022,19 +1032,23 @@
                deliveringCount.incrementAndGet();
                count++;
 
-               byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
-               if (duplicateBytes != null)
+               if (rejectDuplicates)
                {
-                  if (targetDuplicateCache.contains(duplicateBytes))
+                  byte [] duplicateBytes = ref.getMessage().getDuplicateIDBytes();
+                  if (duplicateBytes != null)
                   {
-                     log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
-                     acknowledge(tx, ref);
-                     ignored = true;
+                     if (targetDuplicateCache.contains(duplicateBytes))
+                     {
+                        log.info("Message with duplicate ID " + ref.getMessage().getDuplicateProperty() + " was already set at " + toAddress + ". Move from " + this.address + " being ignored and message removed from " + this.address);
+                        acknowledge(tx, ref);
+                        ignored = true;
+                     }
                   }
                }
+               
                if (!ignored)
                {
-                  move(toAddress, tx, ref, false);
+                  move(toAddress, tx, ref, false, rejectDuplicates);
                }
                iter.remove();
             }
@@ -1055,7 +1069,7 @@
    
             deliveringCount.incrementAndGet();
             count++;
-            move(toAddress, tx, ref, false);
+            move(toAddress, tx, ref, false, rejectDuplicates);
             acknowledge(tx, ref);
          }
    
@@ -1418,13 +1432,14 @@
    private void move(final SimpleString toAddress,
                      final Transaction tx,
                      final MessageReference ref,
-                     final boolean expiry) throws Exception
+                     final boolean expiry,
+                     final boolean rejectDuplicate) throws Exception
    {
       ServerMessage copyMessage = makeCopy(ref, expiry);
 
       copyMessage.setAddress(toAddress);
 
-      postOffice.route(copyMessage, tx, false);
+      postOffice.route(copyMessage, tx, false, rejectDuplicate);
 
       acknowledge(tx, ref);
    }
@@ -1463,7 +1478,7 @@
          }
          else
          {
-            move(expiryAddress, tx, ref, true);
+            move(expiryAddress, tx, ref, true, true);
          }
       }
       else

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/management/impl/JMSQueueControlImpl.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -284,9 +284,14 @@
 
    public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception
    {
+      return moveMessage(messageID, otherQueueName, false);
+   }
+
+   public boolean moveMessage(final String messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+   {
       String filter = JMSQueueControlImpl.createFilterForJMSMessageID(messageID);
       HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
-      int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress());
+      int moved = coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
       if (moved != 1)
       {
          throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
@@ -295,13 +300,19 @@
       return true;
    }
 
-   public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
+   public int moveMessages(final String filterStr, final String otherQueueName, final boolean rejectDuplicates) throws Exception
    {
       String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
       HornetQDestination otherQueue = HornetQDestination.createQueue(otherQueueName);
-      return coreQueueControl.moveMessages(filter, otherQueue.getAddress());
+      return coreQueueControl.moveMessages(filter, otherQueue.getAddress(), rejectDuplicates);
    }
 
+
+   public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
+   {
+      return moveMessages(filterStr, otherQueueName, false);
+   }
+
    @Operation(desc = "List all the existent consumers on the Queue")
    public String listConsumersAsJSON() throws Exception
    {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -762,6 +762,7 @@
       serverManager.destroyQueue(otherQueueName);
    }
 
+   
    public void testMoveMessagesWithDuplicateIDSet() throws Exception
    {
       String otherQueueName = RandomUtil.randomString();
@@ -799,7 +800,7 @@
 
       Assert.assertEquals(10, queueControl.getMessageCount());
 
-      int moved = queueControl.moveMessages(null, otherQueueName);
+      int moved = queueControl.moveMessages(null, otherQueueName, true);
 
       assertEquals(10, moved);
 
@@ -886,7 +887,7 @@
 
       for (int i = 0 ; i < 10; i++)
       {
-         queueControl.moveMessage(ids[i], otherQueueName);
+         queueControl.moveMessage(ids[i], otherQueueName, true);
       }
 
       assertEquals(0, queueControl.getDeliveringCount());
@@ -955,7 +956,7 @@
       Assert.assertEquals(1, queueControl.getMessageCount());
       Assert.assertEquals(1, otherQueueControl.getMessageCount());
 
-      int moved = queueControl.moveMessages(null, otherQueueName);
+      int moved = queueControl.moveMessages(null, otherQueueName, true);
 
       assertEquals(1, moved);
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -202,6 +202,16 @@
             return (String)proxy.invokeOperation("listMessagesAsJSON", filter);
          }
 
+         public boolean moveMessage(String messageID, String otherQueueName, boolean rejectDuplicates) throws Exception
+         {
+            return (Boolean)proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates);
+         }
+
+         public int moveMessages(String filter, String otherQueueName, boolean rejectDuplicates) throws Exception
+         {
+            return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName, rejectDuplicates);
+         }
+
          public int moveMessages(final String filter, final String otherQueueName) throws Exception
          {
             return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -20,23 +20,26 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.api.core.management.DayCounterInfo;
 import org.hornetq.api.core.management.HornetQServerControl;
 import org.hornetq.api.core.management.MessageCounterInfo;
 import org.hornetq.api.core.management.QueueControl;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.json.JSONArray;
-import org.hornetq.utils.json.JSONObject;
 
 /**
  * A QueueControlTest
@@ -212,9 +215,8 @@
       ClientConsumer consumer = session.createConsumer(queue);
       Assert.assertEquals(1, queueControl.getConsumerCount());
 
-      
       System.out.println("Consumers: " + queueControl.listConsumersAsJSON());
-      
+
       JSONArray obj = new JSONArray(queueControl.listConsumersAsJSON());
 
       assertEquals(1, obj.length());
@@ -1272,6 +1274,77 @@
 
    }
 
+   public void testMoveMessagesBack() throws Exception
+   {
+      server.createQueue(new SimpleString("q1"), new SimpleString("q1"), null, true, false);
+      server.createQueue(new SimpleString("q2"), new SimpleString("q2"), null, true, false);
+
+      ServerLocator locator = createInVMNonHALocator();
+
+      ClientSessionFactory sf = locator.createSessionFactory();
+
+      ClientSession session = sf.createSession(true, true);
+
+      ClientProducer prod1 = session.createProducer("q1");
+
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage msg = session.createMessage(true);
+
+         msg.putStringProperty(org.hornetq.api.core.Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("dupl-" + i));
+
+         prod1.send(msg);
+      }
+
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer("q1", true);
+      session.start();
+
+      assertNotNull(consumer.receive(5000));
+      consumer.close();
+
+      QueueControl q1Control = ManagementControlHelper.createQueueControl(new SimpleString("q1"),
+                                                                          new SimpleString("q1"),
+                                                                          mbeanServer);
+
+      QueueControl q2Control = ManagementControlHelper.createQueueControl(new SimpleString("q2"),
+                                                                          new SimpleString("q2"),
+                                                                          mbeanServer);
+
+      assertEquals(10, q1Control.moveMessages(null, "q2"));
+
+      consumer = session.createConsumer("q2", true);
+
+      assertNotNull(consumer.receive(500));
+
+      consumer.close();
+
+      q2Control.moveMessages(null, "q1", false);
+
+      session.start();
+      consumer = session.createConsumer("q1");
+
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage msg = consumer.receive(5000);
+         System.out.println("msg = " + msg);
+         assertNotNull(msg);
+         msg.acknowledge();
+      }
+
+      consumer.close();
+
+      session.deleteQueue("q1");
+
+      session.deleteQueue("q2");
+
+      session.close();
+
+      locator.close();
+
+   }
+
    public void testPauseAndResume()
    {
       long counterPeriod = 1000;

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -206,11 +206,21 @@
             return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName);
          }
 
+         public int moveMessages(final String filter, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+         {
+            return (Integer)proxy.invokeOperation("moveMessages", filter, otherQueueName, rejectDuplicates);
+         }
+
          public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
          {
             return (Boolean)proxy.invokeOperation("moveMessage", messageID, otherQueueName);
          }
 
+         public boolean moveMessage(final long messageID, final String otherQueueName, final boolean rejectDuplicates) throws Exception
+         {
+            return (Boolean)proxy.invokeOperation("moveMessage", messageID, otherQueueName, rejectDuplicates);
+         }
+
          public int removeMessages(final String filter) throws Exception
          {
             return (Integer)proxy.invokeOperation("removeMessages", filter);
@@ -265,7 +275,6 @@
          {
             return (String)proxy.invokeOperation("listConsumersAsJSON");
          }
-
       };
    }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -621,4 +621,22 @@
       this.subs = sub;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#moveReference(long, org.hornetq.api.core.SimpleString, boolean)
+    */
+   public boolean moveReference(long messageID, SimpleString toAddress, boolean rejectDuplicates) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#moveReferences(org.hornetq.core.filter.Filter, org.hornetq.api.core.SimpleString, boolean)
+    */
+   public int moveReferences(Filter filter, SimpleString toAddress, boolean rejectDuplicates) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
 }
\ No newline at end of file

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2011-01-21 18:54:45 UTC (rev 10130)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2011-01-21 22:44:33 UTC (rev 10131)
@@ -198,4 +198,22 @@
       
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext, boolean, boolean)
+    */
+   public void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction, boolean, boolean)
+    */
+   public void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }
\ No newline at end of file



More information about the hornetq-commits mailing list