[jboss-cvs] JBoss Messaging SVN: r5439 - in trunk: src/main/org/jboss/messaging/core/management and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 27 10:59:16 EST 2008


Author: jmesnil
Date: 2008-11-27 10:59:16 -0500 (Thu, 27 Nov 2008)
New Revision: 5439

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
   trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
   trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
   trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java
Log:
refactored Queue and MessageReference so that methods dealing with many messages (e.g. by passing a filter) do the work within a single transaction 

Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -97,6 +97,18 @@
   
   private final FilterParser parser = new FilterParser();
   
+  // Static ---------------------------------------------------------
+
+  /**
+   * @return null if <code>filterStr</code> is null or a valid filter else
+   * @throws MessagingException if the string does not correspond to a valid filter
+   */
+  public static Filter createFilter(final String filterStr) throws MessagingException
+  {
+     Filter filter = (filterStr == null) ? null : new FilterImpl(new SimpleString(filterStr));
+     return filter;
+  }
+
   // Constructors ---------------------------------------------------
   
   public FilterImpl(final SimpleString str) throws MessagingException

Modified: trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -82,7 +82,7 @@
          throws Exception;
 
    @Operation(desc = "Remove all the messages from the queue", impact = ACTION)
-   void removeAllMessages() throws Exception;
+   int removeAllMessages() throws Exception;
 
    @Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
    boolean removeMessage(

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -202,8 +202,7 @@
    {
       try
       {
-         Filter filter = filterStr == null ? null : new FilterImpl(
-               new SimpleString(filterStr));
+         Filter filter = FilterImpl.createFilter(filterStr);
          List<MessageReference> refs = queue.list(filter);
          MessageInfo[] infos = new MessageInfo[refs.size()];
          for (int i = 0; i < refs.size(); i++)
@@ -230,11 +229,11 @@
       }
    }
 
-   public void removeAllMessages() throws Exception
+   public int removeAllMessages() throws Exception
    {
       try
       {
-         queue.deleteAllReferences(storageManager);
+         return queue.deleteAllReferences(storageManager);
       } catch (MessagingException e)
       {
          throw new IllegalStateException(e.getMessage());
@@ -254,13 +253,8 @@
    
    public int removeMatchingMessages(String filterStr) throws Exception
    {
-      Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
-      List<MessageReference> refs = queue.list(filter);
-      for (MessageReference ref : refs)
-      {
-         removeMessage(ref.getMessage().getMessageID());
-      }
-      return refs.size();
+      Filter filter = FilterImpl.createFilter(filterStr);
+      return queue.deleteMatchingReferences(filter, storageManager);
    }
 
    public boolean expireMessage(final long messageID) throws Exception
@@ -273,18 +267,8 @@
    {
       try
       {
-         Filter filter = null;
-         if (filterStr != null)
-         {
-            filter = new FilterImpl(new SimpleString(filterStr));
-         }
-         List<MessageReference> refs = queue.list(filter);
-         for (MessageReference ref : refs)
-         {
-            queue.expireMessage(ref.getMessage().getMessageID(),
-                  storageManager, postOffice, queueSettingsRepository);
-         }
-         return refs.size();
+         Filter filter = FilterImpl.createFilter(filterStr);
+         return queue.expireMessages(filter, storageManager, postOffice, queueSettingsRepository);
       } catch (MessagingException e)
       {
          throw new IllegalStateException(e.getMessage());
@@ -301,21 +285,20 @@
                + otherQueueName);
       }
 
-      return queue.moveMessage(messageID, binding, storageManager, postOffice);
+      return queue.moveMessage(messageID, binding.getAddress(), storageManager, postOffice);
    }
    
    public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
    {
-      Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
-      List<MessageReference> refs = queue.list(filter);
-      synchronized (queue)
+      Filter filter = FilterImpl.createFilter(filterStr);
+      Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+      if (binding == null)
       {
-         for (MessageReference ref : refs)
-         {
-            moveMessage(ref.getMessage().getMessageID(), otherQueueName);
-         }
-         return refs.size();
+         throw new IllegalArgumentException("No queue found for "
+               + otherQueueName);
       }
+
+      return queue.moveMessages(filter, binding.getAddress(), storageManager, postOffice);
    }
    
    public int moveAllMessages(String otherQueueName) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessageReference.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/MessageReference.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -23,10 +23,11 @@
 package org.jboss.messaging.core.server;
 
 import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * A reference to a message.
@@ -71,10 +72,16 @@
    
    void expire(StorageManager storageManager, PostOffice postOffice,
          HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
-   
-   void move(Binding otherBinding, StorageManager persistenceManager, PostOffice postOffice) throws Exception;
 
+   void expire(Transaction tx,
+               StorageManager storageManager,
+               PostOffice postOffice,
+               HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
 
+   void move(SimpleString toAddress, StorageManager persistenceManager, PostOffice postOffice) throws Exception;
+
+   void move(SimpleString toAddress, Transaction tx, StorageManager persistenceManager, boolean expiry) throws Exception;
+
 }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -28,7 +28,6 @@
 
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -108,16 +107,27 @@
    
    MessageReference getReference(long id);
    
-   void deleteAllReferences(StorageManager storageManager) throws Exception;
+   int deleteAllReferences(StorageManager storageManager) throws Exception;
 
    boolean deleteReference(long messageID, StorageManager storageManager)
          throws Exception;
 
+   int deleteMatchingReferences(Filter filter, StorageManager storageManager)
+         throws Exception;
+
    boolean expireMessage(long messageID, StorageManager storageManager,
          PostOffice postOffice,
          HierarchicalRepository<QueueSettings> queueSettingsRepository)
          throws Exception;
 
+   /**
+    * Flagged all the messages in the queue which matches the filter as <em>expired</em>
+    */
+   int expireMessages(Filter filter,
+                      StorageManager storageManager,
+                      PostOffice postOffice,
+                      HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+
    void expireMessages(final StorageManager storageManager,
                                 final PostOffice postOffice,
                                 final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
@@ -132,9 +142,11 @@
          HierarchicalRepository<QueueSettings> queueSettingsRepository)
          throws Exception;
 
-   boolean moveMessage(long messageID, Binding toBinding,
+   boolean moveMessage(long messageID, SimpleString toAddress,
          StorageManager storageManager, PostOffice postOffice) throws Exception;
 
+   int moveMessages(Filter filter, SimpleString toAddress, StorageManager storageManager, PostOffice postOffice) throws Exception;
+
    void setBackup();
    
    boolean activate();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -220,12 +220,51 @@
       }
 
    }
+   
+   public void expire(final Transaction tx,
+                      final StorageManager storageManager,
+                      final PostOffice postOffice,
+                      final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+   {
+      SimpleString expiryAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getExpiryAddress();
 
-   public void move(final Binding otherBinding, final StorageManager persistenceManager, final PostOffice postOffice) throws Exception
+      if (expiryAddress != null)
+      {
+         List<Binding> bindingList = postOffice.getBindingsForAddress(expiryAddress);
+         
+         if (bindingList.isEmpty())
+         {
+            log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
+         }
+         else
+         {
+            move(expiryAddress, tx, storageManager, true);
+         }
+      }
+      else
+      {
+         log.warn("Message has expired. No expiry queue configured for queue " + queue.getName() + " so dropping it");
+
+         tx.addAcknowledgement(this);
+      }
+   }
+
+   public void move(final SimpleString toAddress, final StorageManager persistenceManager, final PostOffice postOffice) throws Exception
    {
-      move(otherBinding, persistenceManager, postOffice, false);
+      move(toAddress, persistenceManager, postOffice, false);
    }
+   
+   public void move(final SimpleString toAddress, final Transaction tx, final StorageManager persistenceManager, final boolean expiry) throws Exception
+   {
+      ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
 
+      copyMessage.setDestination(toAddress);
+
+      tx.addMessage(copyMessage);
+
+      tx.addAcknowledgement(this);
+   }
+
    // Public --------------------------------------------------------
 
    public String toString()
@@ -241,24 +280,6 @@
 
    // Private -------------------------------------------------------
 
-   private void move(final Binding otherBinding,
-                     final StorageManager persistenceManager,
-                     final PostOffice postOffice,
-                     final boolean expiry) throws Exception
-   {
-      Transaction tx = new TransactionImpl(persistenceManager, postOffice);
-
-      ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
-
-      copyMessage.setDestination(otherBinding.getAddress());
-
-      tx.addMessage(copyMessage);
-
-      tx.addAcknowledgement(this);
-
-      tx.commit();
-   }
-
    private void move(final SimpleString address,
                      final StorageManager persistenceManager,
                      final PostOffice postOffice,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -28,7 +28,6 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.DistributionPolicy;
@@ -41,8 +40,8 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.ConcurrentHashSet;
 
 /**
  * Implementation of a Queue TODO use Java 5 concurrent queue
@@ -403,8 +402,15 @@
       return messagesAdded.get();
    }
 
-   public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
+   public synchronized int deleteAllReferences(final StorageManager storageManager) throws Exception
    {
+      return deleteMatchingReferences(null, storageManager);
+   }
+   
+   public synchronized int deleteMatchingReferences(final Filter filter, final StorageManager storageManager) throws Exception
+   {
+      int count = 0;
+
       Transaction tx = new TransactionImpl(storageManager, postOffice);
 
       Iterator<MessageReference> iter = messageReferences.iterator();
@@ -413,22 +419,29 @@
       {
          MessageReference ref = iter.next();
 
-         deliveringCount.incrementAndGet();
-
-         tx.addAcknowledgement(ref);
-
-         iter.remove();
+         if (filter == null || filter.match(ref.getMessage()))
+         {
+            deliveringCount.incrementAndGet();
+            tx.addAcknowledgement(ref);
+            iter.remove();
+            count++;
+         }
       }
-
+      
       List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
       for (MessageReference messageReference : cancelled)
       {
-         deliveringCount.incrementAndGet();
-
-         tx.addAcknowledgement(messageReference);
+         if (filter == null || filter.match(messageReference.getMessage()))
+         {
+            deliveringCount.incrementAndGet();
+            tx.addAcknowledgement(messageReference);
+            count++;
+         }
       }
 
       tx.commit();
+      
+      return count;
    }
 
    public synchronized boolean deleteReference(final long messageID, final StorageManager storageManager) throws Exception
@@ -478,6 +491,32 @@
       return false;
    }
 
+   public int expireMessages(final Filter filter,
+                             final StorageManager storageManager,
+                             final PostOffice postOffice,
+                             final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+   {
+      Transaction tx = new TransactionImpl(storageManager, postOffice);
+
+      int count = 0;
+      Iterator<MessageReference> iter = messageReferences.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+         if (filter == null || filter.match(ref.getMessage()))
+         {
+            deliveringCount.incrementAndGet();
+            ref.expire(tx, storageManager, postOffice, queueSettingsRepository);
+            iter.remove();
+            count++;
+         }
+      }
+      
+      tx.commit();
+
+      return count;
+   }
    
    public void expireMessages(final StorageManager storageManager,
                                 final PostOffice postOffice,
@@ -522,7 +561,7 @@
    }
 
    public boolean moveMessage(final long messageID,
-                              final Binding toBinding,
+                              final SimpleString toAddress,
                               final StorageManager storageManager,
                               final PostOffice postOffice) throws Exception
    {
@@ -534,14 +573,50 @@
          if (ref.getMessage().getMessageID() == messageID)
          {
             deliveringCount.incrementAndGet();
-            ref.move(toBinding, storageManager, postOffice);
+            ref.move(toAddress, storageManager, postOffice);
             iter.remove();
             return true;
          }
       }
       return false;
    }
+   
+   public synchronized int moveMessages(final Filter filter, final SimpleString toAddress, final StorageManager storageManager, final PostOffice postOffice) throws Exception
+   {
+      Transaction tx = new TransactionImpl(storageManager, postOffice);
 
+      int count = 0;
+      Iterator<MessageReference> iter = messageReferences.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+         if (filter == null || filter.match(ref.getMessage()))
+         {
+            deliveringCount.incrementAndGet();
+            ref.move(toAddress, tx, storageManager, false);
+            iter.remove();
+            count++;
+         }
+      }
+      
+      List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+      for (MessageReference ref : cancelled)
+      {
+         if (filter == null || filter.match(ref.getMessage()))
+         {
+            deliveringCount.incrementAndGet();
+            ref.move(toAddress, tx, storageManager, false);
+            tx.addAcknowledgement(ref);
+            count++;
+         }
+      }
+
+      tx.commit();
+      
+      return count;
+   }
+
    public boolean changeMessagePriority(final long messageID,
                                         final byte newPriority,
                                         final StorageManager storageManager,

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -22,6 +22,10 @@
 
 package org.jboss.messaging.jms.server.management;
 
+import static javax.management.MBeanOperationInfo.ACTION;
+
+import org.jboss.messaging.core.management.Operation;
+
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -44,6 +48,7 @@
 
    // Operations ----------------------------------------------------
 
-   public void removeAllMessages() throws Exception;
+   @Operation(desc = "Remove all the messages from the destination", impact = ACTION)
+   int removeAllMessages() throws Exception;
 
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -77,9 +77,6 @@
          @Parameter(name = "filter", desc = "A JMS Message filter") String filter)
          throws Exception;
 
-   @Operation(desc = "Remove all the messages from the queue", impact = ACTION)
-   void removeAllMessages() throws Exception;
-
    @Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
    boolean removeMessage(
          @Parameter(name = "messageID", desc = "A message ID") String messageID)

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -36,6 +36,7 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.DayCounterInfo;
 import org.jboss.messaging.core.management.MessageCounterInfo;
 import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
@@ -50,7 +51,6 @@
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.jms.JBossDestination;
 import org.jboss.messaging.jms.JBossQueue;
 import org.jboss.messaging.jms.client.JBossMessage;
@@ -84,6 +84,12 @@
 
    // Static --------------------------------------------------------
 
+   public static Filter createFilterFromJMSSelector(final String selectorStr) throws MessagingException
+   {
+      String filterStr = (selectorStr == null) ? null : SelectorTranslator.convertToJBMFilterString(selectorStr);
+      return FilterImpl.createFilter(filterStr);
+   }
+   
    private static Filter createFilterForJMSMessageID(String jmsMessageID)
          throws Exception
    {
@@ -227,25 +233,17 @@
    {
       try
       {
-         Filter filter = filterStr == null ? null : new FilterImpl(
-               new SimpleString(SelectorTranslator
-                     .convertToJBMFilterString(filterStr)));
-
-         List<MessageReference> refs = coreQueue.list(filter);
-         for (MessageReference ref : refs)
-         {
-            coreQueue.deleteReference(ref.getMessage().getMessageID(), storageManager);
-         }
-         return refs.size();
+         Filter filter = createFilterFromJMSSelector(filterStr);
+         return coreQueue.deleteMatchingReferences(filter, storageManager);
       } catch (MessagingException e)
       {
          throw new IllegalStateException(e.getMessage());
       }
    }
    
-   public void removeAllMessages() throws Exception
+   public int removeAllMessages() throws Exception
    {
-      coreQueue.deleteAllReferences(storageManager);
+      return coreQueue.deleteAllReferences(storageManager);
    }
 
    public TabularData listAllMessages() throws Exception
@@ -257,9 +255,7 @@
    {
       try
       {
-         Filter filter = filterStr == null ? null : new FilterImpl(
-               new SimpleString(SelectorTranslator
-                     .convertToJBMFilterString(filterStr)));
+         Filter filter = createFilterFromJMSSelector(filterStr);
 
          List<MessageReference> messageRefs = coreQueue.list(filter);
          List<JMSMessageInfo> infos = new ArrayList<JMSMessageInfo>(messageRefs
@@ -294,9 +290,7 @@
    {
       try
       {
-         Filter filter = filterStr == null ? null : new FilterImpl(
-               new SimpleString(SelectorTranslator
-                     .convertToJBMFilterString(filterStr)));
+         Filter filter = createFilterFromJMSSelector(filterStr);
 
          List<MessageReference> refs = coreQueue.list(filter);
          for (MessageReference ref : refs)
@@ -354,21 +348,20 @@
                + otherQueueName);
       }
 
-      return coreQueue.moveMessage(messageID, binding, storageManager, postOffice);
+      return coreQueue.moveMessage(messageID, binding.getAddress(), storageManager, postOffice);
    }
    
    public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
    {
-      Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
-      List<MessageReference> refs = coreQueue.list(filter);
-      synchronized (coreQueue)
+      Binding otherBinding = postOffice.getBinding(new SimpleString(otherQueueName));
+      if (otherBinding == null)
       {
-         for (MessageReference ref : refs)
-         {
-            moveMessage(ref.getMessage().getMessageID(), otherQueueName);
-         }
-         return refs.size();
+         throw new IllegalArgumentException("No queue found for "
+               + otherQueueName);
       }
+
+      Filter filter = createFilterFromJMSSelector(filterStr);
+      return coreQueue.moveMessages(filter, otherBinding.getAddress(), storageManager, postOffice);
    }
    
    public int moveAllMessages(String otherQueueName) throws Exception

Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -207,16 +207,19 @@
       return JMSMessageInfo.toTabularData(infos);
    }
 
-   public void removeAllMessages() throws Exception
+   public int removeAllMessages() throws Exception
    {
+      int count = 0;
       List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic
             .getSimpleAddress());
 
       for (Binding binding : bindings)
       {
          Queue queue = binding.getQueue();
-         queue.deleteAllReferences(storageManager);
+         count += queue.deleteAllReferences(storageManager);
       }
+      
+      return count;
    }
    
    public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception

Added: trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -0,0 +1,343 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.management;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServerInvocationHandler;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A QueueControlTest
+ *
+ * @author jmesnil
+ * 
+ * Created 26 nov. 2008 14:18:48
+ *
+ *
+ */
+public class QueueControlTest extends TestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService service;
+
+   // Static --------------------------------------------------------
+
+   private static QueueControlMBean createQueueControl(SimpleString address, SimpleString name) throws Exception
+   {
+      QueueControlMBean queueControl = (QueueControlMBean)MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(),
+                                                                                                        ManagementServiceImpl.getQueueObjectName(address,
+                                                                                                                                                 name),
+                                                                                                        QueueControlMBean.class,
+                                                                                                        false);
+      return queueControl;
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   /**
+    * <ol>
+    * <li>send a message to queue</li>
+    * <li>move all messages from queue to otherQueue using management method</li>
+    * <li>check there is no message to consume from queue</li>
+    * <li>consume the message from otherQueue</li>
+    * </ol>
+    */
+   public void testMoveAllMessages() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      ClientSession session = sf.createSession(false, true, true);
+
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
+      SimpleString otherAddress = randomSimpleString();
+      SimpleString otherQueue = randomSimpleString();
+
+      session.createQueue(address, queue, null, false, true, true);
+      session.createQueue(otherAddress, otherQueue, null, false, true, true);
+      ClientProducer producer = session.createProducer(address);
+      session.start();
+
+      // send on queue
+      ClientMessage message = session.createClientMessage(false);
+      SimpleString key = randomSimpleString();
+      long value = randomLong();
+      message.putLongProperty(key, value);
+      producer.send(message);
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(100);
+      QueueControlMBean queueControl = createQueueControl(address, queue);
+      assertEquals(1, queueControl.getMessageCount());
+
+      // moved all messages to otherQueue
+      int movedMessagesCount = queueControl.moveAllMessages(otherQueue.toString());
+      assertEquals(1, movedMessagesCount);
+      assertEquals(0, queueControl.getMessageCount());
+
+      // check there is no message to consume from queue
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage m = consumer.receive(500);
+      assertNull(m);
+
+      // consume the message from otherQueue
+      ClientConsumer otherConsumer = session.createConsumer(otherQueue);
+      m = otherConsumer.receive(500);
+      assertEquals(value, m.getProperty(key));
+
+      m.acknowledge();
+
+      consumer.close();
+      session.deleteQueue(queue);
+      otherConsumer.close();
+      session.deleteQueue(otherQueue);
+      session.close();
+   }
+
+   /**
+    * <ol>
+    * <li>send 2 message to queue</li>
+    * <li>move messages from queue to otherQueue using management method <em>with filter</em></li>
+    * <li>consume the message which <strong>did not</strong> matches the filter from queue</li>
+    * <li>consume the message which <strong>did</strong> matches the filter from otherQueue</li>
+    * </ol>
+    */
+
+   public void testMoveMatchingMessages() throws Exception
+   {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      ClientSession session = sf.createSession(false, true, true);
+
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
+      SimpleString otherAddress = randomSimpleString();
+      SimpleString otherQueue = randomSimpleString();
+
+      session.createQueue(address, queue, null, false, true, true);
+      session.createQueue(otherAddress, otherQueue, null, false, true, true);
+      ClientProducer producer = session.createProducer(address);
+      session.start();
+
+      // send on queue
+      ClientMessage matchingMessage = session.createClientMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      producer.send(matchingMessage);
+      ClientMessage unmatchingMessage = session.createClientMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(unmatchingMessage);
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(100);
+      QueueControlMBean queueControl = createQueueControl(address, queue);
+      assertEquals(2, queueControl.getMessageCount());
+
+      // moved matching messages to otherQueue
+      int movedMatchedMessagesCount = queueControl.moveMatchingMessages(key + " =" + matchingValue, otherQueue.toString());
+      assertEquals(1, movedMatchedMessagesCount);
+      assertEquals(1, queueControl.getMessageCount());
+
+      // consume the unmatched message from queue
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage m = consumer.receive(500);
+      assertNotNull(m);
+      assertEquals(unmatchingValue, m.getProperty(key));
+      
+      // consume the matched message from otherQueue
+      ClientConsumer otherConsumer = session.createConsumer(otherQueue);
+      m = otherConsumer.receive(500);
+      assertNotNull(m);
+      assertEquals(matchingValue, m.getProperty(key));
+
+      m.acknowledge();
+
+      consumer.close();
+      session.deleteQueue(queue);
+      otherConsumer.close();
+      session.deleteQueue(otherQueue);
+      session.close();
+   }
+
+   /**
+    * <ol>
+    * <li>send 2 messages to queue</li>
+    * <li>remove all messages using management method</li>
+    * <li>check there is no message to consume from queue</li>
+    * <li>consume the message from otherQueue</li>
+    * </ol>
+    */
+   public void testRemoveAllMessages() throws Exception
+   {
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      ClientSession session = sf.createSession(false, true, true);
+
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
+
+      session.createQueue(address, queue, null, false, true, true);
+      ClientProducer producer = session.createProducer(address);
+      session.start();
+
+      // send 2 messages on queue
+      producer.send(session.createClientMessage(false));
+      producer.send(session.createClientMessage(false));
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(100);
+      QueueControlMBean queueControl = createQueueControl(address, queue);
+      assertEquals(2, queueControl.getMessageCount());
+
+      // delete all messages
+      int deletedMessagesCount = queueControl.removeAllMessages();
+      assertEquals(2, deletedMessagesCount);
+      assertEquals(0, queueControl.getMessageCount());
+
+      // check there is no message to consume from queue
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage m = consumer.receive(500);
+      assertNull(m);
+
+      consumer.close();
+      session.deleteQueue(queue);
+      session.close();
+   }
+   
+   /**
+    * <ol>
+    * <li>send 2 message to queue</li>
+    * <li>remove messages from queue using management method <em>with filter</em></li>
+    * <li>check there is only one message to consume from queue</li>
+    * </ol>
+    */
+
+   public void testRemoveMatchingMessages() throws Exception
+   {
+      SimpleString key = new SimpleString("key");
+      long matchingValue = randomLong();
+      long unmatchingValue = matchingValue + 1;
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+      ClientSession session = sf.createSession(false, true, true);
+
+      SimpleString address = randomSimpleString();
+      SimpleString queue = randomSimpleString();
+
+      session.createQueue(address, queue, null, false, true, true);
+      ClientProducer producer = session.createProducer(address);
+      session.start();
+
+      // send on queue
+      ClientMessage matchingMessage = session.createClientMessage(false);
+      matchingMessage.putLongProperty(key, matchingValue);
+      producer.send(matchingMessage);
+      ClientMessage unmatchingMessage = session.createClientMessage(false);
+      unmatchingMessage.putLongProperty(key, unmatchingValue);
+      producer.send(unmatchingMessage);
+
+      // wait a little bit to ensure the message is handled by the server
+      Thread.sleep(100);
+      QueueControlMBean queueControl = createQueueControl(address, queue);
+      assertEquals(2, queueControl.getMessageCount());
+
+      // removed matching messages to otherQueue
+      int removedMatchedMessagesCount = queueControl.removeMatchingMessages(key + " =" + matchingValue);
+      assertEquals(1, removedMatchedMessagesCount);
+      assertEquals(1, queueControl.getMessageCount());
+
+      // consume the unmatched message from queue
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage m = consumer.receive(500);
+      assertNotNull(m);
+      assertEquals(unmatchingValue, m.getProperty(key));
+
+      m.acknowledge();
+
+      // check there is no other message to consume:
+      m = consumer.receive(500);
+      assertNull(m);
+
+
+      consumer.close();
+      session.deleteQueue(queue);
+      session.close();
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+
+      Configuration conf = new ConfigurationImpl();
+      conf.setSecurityEnabled(false);
+      conf.setJMXManagementEnabled(true);
+      conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+      service.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      service.stop();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -31,6 +31,7 @@
 import static org.jboss.messaging.tests.util.RandomUtil.randomBoolean;
 import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
 import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
 
@@ -470,7 +471,7 @@
       expect(queue.getName()).andReturn(new SimpleString(name));
       expect(binding.getQueue()).andReturn(queue);
       expect(postOffice.getBinding(new SimpleString(name))).andReturn(binding);
-      queue.deleteAllReferences(storageManager);
+      expect(queue.deleteAllReferences(storageManager)).andReturn(randomPositiveInt());
       expect(postOffice.removeBinding(new SimpleString(name))).andReturn(
             binding);
       replayMockedAttributes();

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.tests.unit.core.management.impl;
 
 import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.isA;
@@ -32,6 +33,7 @@
 import static org.jboss.messaging.tests.util.RandomUtil.randomByte;
 import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
 import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
 import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
@@ -318,12 +320,13 @@
 
    public void testRemoveAllMessages() throws Exception
    {
-      queue.deleteAllReferences(storageManager);
+      int messageRemoved = randomPositiveInt();
+      expect(queue.deleteAllReferences(storageManager)).andReturn(messageRemoved);
 
       replayMockedAttributes();
 
       QueueControlMBean control = createControl();
-      control.removeAllMessages();
+      assertEquals(messageRemoved,control.removeAllMessages());
 
       verifyMockedAttributes();
    }
@@ -458,46 +461,27 @@
 
    public void testExpireMessagesWithFilter() throws Exception
    {
-      long messageID_1 = randomLong();
-      long messageID_2 = randomLong();
+      int expiredMessagesCount = randomPositiveInt();
 
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-      MessageReference ref_1 = createMock(MessageReference.class);
-      ServerMessage message_1 = createMock(ServerMessage.class);
-      expect(message_1.getMessageID()).andStubReturn(messageID_1);
-      expect(ref_1.getMessage()).andReturn(message_1);
-      MessageReference ref_2 = createMock(MessageReference.class);
-      ServerMessage message_2 = createMock(ServerMessage.class);
-      expect(message_2.getMessageID()).andStubReturn(messageID_2);
-      expect(ref_2.getMessage()).andReturn(message_2);
-      refs.add(ref_1);
-      refs.add(ref_2);
-      expect(queue.list(isA(Filter.class))).andReturn(refs);
-      expect(
-            queue.expireMessage(messageID_1, storageManager, postOffice,
-                  repository)).andReturn(true);
-      expect(
-            queue.expireMessage(messageID_2, storageManager, postOffice,
-                  repository)).andReturn(true);
-
+      expect(queue.expireMessages(isA(Filter.class), eq(storageManager), eq(postOffice), eq(repository))).andReturn(expiredMessagesCount);
       replayMockedAttributes();
-      replay(ref_1, ref_2, message_1, message_2);
 
       QueueControlMBean control = createControl();
-      assertEquals(2, control.expireMessages("foo = true"));
+      assertEquals(expiredMessagesCount, control.expireMessages("foo = true"));
 
       verifyMockedAttributes();
-      verify(ref_1, ref_2, message_1, message_2);
    }
 
    public void testMoveMessage() throws Exception
    {
       long messageID = randomLong();
       SimpleString otherQueueName = randomSimpleString();
+      SimpleString otherAddress = randomSimpleString();
       Binding otherBinding = createMock(Binding.class);
+      expect(otherBinding.getAddress()).andReturn(otherAddress);
       expect(postOffice.getBinding(otherQueueName)).andReturn(otherBinding);
       expect(
-            queue.moveMessage(messageID, otherBinding, storageManager,
+            queue.moveMessage(messageID, otherAddress, storageManager,
                   postOffice)).andReturn(true);
 
       replayMockedAttributes();
@@ -534,10 +518,12 @@
    {
       long messageID = randomLong();
       SimpleString otherQueueName = randomSimpleString();
+      SimpleString otherAddress = randomSimpleString();
       Binding otherBinding = createMock(Binding.class);
+      expect(otherBinding.getAddress()).andReturn(otherAddress);
       expect(postOffice.getBinding(otherQueueName)).andReturn(otherBinding);
       expect(
-            queue.moveMessage(messageID, otherBinding, storageManager,
+            queue.moveMessage(messageID, otherAddress, storageManager,
                   postOffice)).andReturn(false);
 
       replayMockedAttributes();

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -337,7 +337,7 @@
 
       EasyMock.replay(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage, pm);
       
-      messageReference.move(toBinding, persistenceManager, postOffice);
+      messageReference.move(toAddress, persistenceManager, postOffice);
       
       EasyMock.verify(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage, pm);
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -1410,7 +1410,7 @@
       assertEquals(0, queue.getDeliveringCount());
       assertTrue(queue.getSizeBytes() > 0);
       
-      queue.moveMessage(messageID, toBinding, storageManager, postOffice);
+      queue.moveMessage(messageID, toQueueName, storageManager, postOffice);
       
       assertEquals(0, queue.getMessageCount());
       assertEquals(0, queue.getDeliveringCount());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -31,6 +31,7 @@
 import static org.jboss.messaging.tests.util.RandomUtil.randomByte;
 import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
 import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
 import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
@@ -56,6 +57,7 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.jms.JBossQueue;
 import org.jboss.messaging.jms.server.management.impl.JMSQueueControl;
+import org.jboss.messaging.tests.util.RandomUtil;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -301,12 +303,13 @@
 
    public void testRemoveAllMessages() throws Exception
    {
-      coreQueue.deleteAllReferences(storageManager);
+      int removedMessagesCount = randomPositiveInt();
+     expect(coreQueue.deleteAllReferences(storageManager)).andReturn(removedMessagesCount);
 
       replayMockedAttributes();
 
       JMSQueueControl control = createControl();
-      control.removeAllMessages();
+      assertEquals(removedMessagesCount, control.removeAllMessages());
 
       verifyMockedAttributes();
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -27,6 +27,7 @@
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
 import java.util.ArrayList;
@@ -45,6 +46,7 @@
 import org.jboss.messaging.jms.JBossTopic;
 import org.jboss.messaging.jms.server.management.SubscriptionInfo;
 import org.jboss.messaging.jms.server.management.impl.TopicControl;
+import org.jboss.messaging.tests.util.RandomUtil;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -244,6 +246,8 @@
    {
       String jndiBinding = randomString();
       String name = randomString();
+      int removedMessagesFromQueue1 = randomPositiveInt();
+      int removedMessagesFromQueue2 = randomPositiveInt();
 
       JBossTopic topic = new JBossTopic(name);
       PostOffice postOffice = createMock(PostOffice.class);
@@ -262,15 +266,15 @@
       bindings.add(bindingForQueue_2);
       expect(postOffice.getBindingsForAddress(topic.getSimpleAddress()))
             .andStubReturn(bindings);
-      queue_1.deleteAllReferences(storageManager);
-      queue_2.deleteAllReferences(storageManager);
+      expect(queue_1.deleteAllReferences(storageManager)).andReturn(removedMessagesFromQueue1);
+      expect(queue_2.deleteAllReferences(storageManager)).andReturn(removedMessagesFromQueue2);
 
       replay(postOffice, storageManager, bindingforQueue_1, queue_1,
             bindingForQueue_2, queue_2);
 
       TopicControl control = new TopicControl(topic, jndiBinding, postOffice,
             storageManager);
-      control.removeAllMessages();
+      assertEquals(removedMessagesFromQueue1 + removedMessagesFromQueue2, control.removeAllMessages());
 
       verify(postOffice, storageManager, bindingforQueue_1, queue_1,
             bindingForQueue_2, queue_2);

Modified: trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java	2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java	2008-11-27 15:59:16 UTC (rev 5439)
@@ -74,8 +74,7 @@
    
    public static int randomPositiveInt()
    {
-      final int value = randomInt();
-      return value >= 0 ? value : value * -1;
+      return Math.abs(randomInt());
    }
    
    public static short randomShort()




More information about the jboss-cvs-commits mailing list