[jboss-cvs] JBoss Messaging SVN: r3734 - in trunk: src/main/org/jboss/jms/server and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Feb 19 11:52:32 EST 2008


Author: ataylor
Date: 2008-02-19 11:52:32 -0500 (Tue, 19 Feb 2008)
New Revision: 3734

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
Modified:
   trunk/src/main/org/jboss/jms/destination/JBossQueue.java
   trunk/src/main/org/jboss/jms/destination/JBossTopic.java
   trunk/src/main/org/jboss/jms/server/ClientInfo.java
   trunk/src/main/org/jboss/jms/server/JMSServerManager.java
   trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
   trunk/src/main/org/jboss/messaging/core/PriorityLinkedList.java
   trunk/src/main/org/jboss/messaging/core/Queue.java
   trunk/src/main/org/jboss/messaging/core/impl/PriorityLinkedListImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
   trunk/tests/src/org/jboss/test/messaging/JBMServerTestCase.java
Log:
more refactoring of management interfaces

Modified: trunk/src/main/org/jboss/jms/destination/JBossQueue.java
===================================================================
--- trunk/src/main/org/jboss/jms/destination/JBossQueue.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/jms/destination/JBossQueue.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -50,7 +50,7 @@
       super(JMS_QUEUE_ADDRESS_PREFIX + name, name);
    }
 
-   public JBossQueue(String address, String name)
+   protected JBossQueue(String address, String name)
    {
       super(address, name);
    }

Modified: trunk/src/main/org/jboss/jms/destination/JBossTopic.java
===================================================================
--- trunk/src/main/org/jboss/jms/destination/JBossTopic.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/jms/destination/JBossTopic.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -55,7 +55,7 @@
       super(JMS_TOPIC_ADDRESS_PREFIX + name, name);
    }
 
-   public JBossTopic(String address, String name)
+   protected JBossTopic(String address, String name)
    {
       super(address, name);
    }

Modified: trunk/src/main/org/jboss/jms/server/ClientInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ClientInfo.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/jms/server/ClientInfo.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -32,7 +32,7 @@
 {
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("HH:mm:ss, EEE, MMM d, yyyy");
 
-   enum status{ STARTED, STOPPED }
+   public enum status{ STARTED, STOPPED }
 
    private String user;
    private String address;

Modified: trunk/src/main/org/jboss/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManager.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManager.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -10,15 +10,17 @@
 
 /**
  * A JMS Management interface.
+ *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  */
 public interface JMSServerManager
 {
    // management operations
-   enum ListType
+   public enum ListType
    {
       ALL, DURABLE, NON_DURABLE
    }
+
    boolean isStarted();
 
    boolean createQueue(String queueName, String jndiBinding) throws Exception;
@@ -35,79 +37,49 @@
 
    boolean destroyConnectionFactory(String name) throws Exception;
 
-   public List<Message> listMessagesForQueue(String queue);
+   public List<Message> listMessagesForQueue(String queue) throws Exception;
 
-   public List<Message> listMessagesForQueue(String queue, ListType listType);
+   public List<Message> listMessagesForQueue(String queue, ListType listType) throws Exception;
 
-   public List<Message> listMessages(JBossQueue queue);
+   public List<Message> listMessagesForSubscription(String subscription) throws Exception;
 
-   public List<Message> listMessagesForTopic(String topic);
+   public List<Message> listMessagesForSubscription(String subscription, ListType listType) throws Exception;
 
-   public List<Message> listMessagesForTopic(String topic, ListType listType);
+   void removeMessageFromQueue(String queueName, String messageId) throws Exception;
 
-   public List<Message> listMessages(JBossTopic topic);
+   void removeMessageFromTopic(String topicName, String messageId) throws Exception;
 
-   public List<Message> listMessages(JBossQueue queue, ListType listType);
-
-   public List<Message> listMessages(JBossTopic topic, ListType listType);
-
    void removeAllMessagesForQueue(String queueName) throws Exception;
 
    void removeAllMessagesForTopic(String topicName) throws Exception;
 
-   void removeAllMessages(JBossQueue queueName) throws Exception;
+   void moveMessage(String fromQueue, String toQueue, String messageID) throws Exception;
 
-   void removeAllMessages(JBossTopic topicName) throws Exception;
-
    int getMessageCountForQueue(String queue) throws Exception;
 
-   int getMessageCount(JBossQueue queue) throws Exception;
-
-   int getMessageCountForTopic(String topic) throws Exception;
-
-   int getMessageCount(JBossTopic topic) throws Exception;
-
    List<SubscriptionInfo> listSubscriptions(String topicName) throws Exception;
 
-   List<SubscriptionInfo> listSubscriptions(JBossTopic topic) throws Exception;
-
    List<SubscriptionInfo> listSubscriptions(String topicName, ListType listType) throws Exception;
 
-   List<SubscriptionInfo> listSubscriptions(JBossTopic topic, ListType listType) throws Exception;
-
    int getSubscriptionsCountForTopic(String topicName) throws Exception;
 
-   int getSubscriptionsCount(JBossTopic topic) throws Exception;
-
    int getSubscriptionsCountForTopic(String topicName, ListType listType) throws Exception;
 
-   int getSubscriptionsCount(JBossTopic topic,ListType listType) throws Exception;
-
    int getConsumerCountForQueue(String queue) throws Exception;
 
-   int getConsumerCountForQueue(JBossQueue queue) throws Exception;
-
    List<ClientInfo> getClients() throws Exception;
 
    void startGatheringStatistics();
 
    void startGatheringStatisticsForQueue(String queue);
 
-   void startGatheringStatistics(JBossQueue queue);
-
    void startGatheringStatisticsForTopic(String topic);
 
-   void startGatheringStatistics(JBossTopic topic);
-
    void stopGatheringStatistics();
 
    void stopGatheringStatisticsForQueue(String queue);
 
-   void stopGatheringStatistics(JBossQueue queue);
-
    void stopGatheringStatisticsForTopic(String topic);
 
-   void stopGatheringStatistics(JBossTopic topic);
-
    List<MessageStatistics> getStatistics() throws Exception;
 }

Modified: trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/jms/server/JMSServerManagerImpl.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -29,9 +29,12 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.MessagingServerManagement;
 import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.impl.server.SubscriptionInfo;
 import org.jboss.jms.server.MessageStatistics;
+import org.jboss.jms.message.JBossMessage;
 import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.messaging.deployers.Deployer;
 import org.jboss.messaging.deployers.DeploymentManager;
 import org.jboss.messaging.util.JNDIUtil;
@@ -294,7 +297,6 @@
 
    // management operations
 
-
    // management operations
 
    public boolean isStarted()
@@ -318,7 +320,12 @@
    {
       JBossTopic jBossTopic = new JBossTopic(topicName);
       messagingServerManagement.addAddress(jBossTopic.getAddress());
-      return bindToJndi(jndiBinding, jBossTopic);
+      boolean added = bindToJndi(jndiBinding, jBossTopic);
+      if (added)
+      {
+         addToDestinationBindings(topicName, jndiBinding);
+      }
+      return added;
    }
 
    public boolean destroyQueue(String name) throws Exception
@@ -363,11 +370,11 @@
          log.debug(this + " created local connectionFactory " + clientConnectionFactory);
          cf = new JBossConnectionFactory(clientConnectionFactory, clientID, dupsOKBatchSize);
       }
-      if(!bindToJndi(jndiBinding, cf))
+      if (!bindToJndi(jndiBinding, cf))
       {
-          return false;
+         return false;
       }
-      if(connectionFactoryBindings.get(name) == null)
+      if (connectionFactoryBindings.get(name) == null)
       {
          connectionFactoryBindings.put(name, new ArrayList<String>());
       }
@@ -388,6 +395,11 @@
       for (String jndiBinding : jndiBindings)
       {
          bindToJndi(jndiBinding, cf);
+         if (connectionFactoryBindings.get(name) == null)
+         {
+            connectionFactoryBindings.put(name, new ArrayList<String>());
+         }
+         connectionFactoryBindings.get(name).add(jndiBinding);
       }
       return true;
    }
@@ -409,46 +421,36 @@
    }
 
 
-   public List<Message> listMessagesForQueue(String queue)
+   public List<Message> listMessagesForQueue(String queue) throws Exception
    {
       return listMessagesForQueue(queue, ListType.ALL);
    }
 
-   public List<Message> listMessagesForQueue(String queue, ListType listType)
+   public List<Message> listMessagesForQueue(String queue, ListType listType) throws Exception
    {
-      return listMessages(new JBossQueue(queue), listType);
+      return listMessages(new JBossQueue(queue).getAddress(), listType);
    }
 
-   public List<Message> listMessages(JBossQueue queue)
+   public List<Message> listMessagesForSubscription(String subscription) throws Exception
    {
-      return listMessages(queue, ListType.ALL);
+      return listMessagesForSubscription(subscription, ListType.ALL);
    }
 
-   public List<Message> listMessagesForTopic(String topic)
+   public List<Message> listMessagesForSubscription(String subscription, ListType listType) throws Exception
    {
-      return listMessagesForTopic(topic, ListType.ALL);
+      return listMessages(subscription, listType);
    }
 
-   public List<Message> listMessagesForTopic(String topic, ListType listType)
+   public void removeMessageFromQueue(String queueName, String messageId) throws Exception
    {
-      return listMessages(new JBossTopic(topic), listType);
+      messagingServerManagement.removeMessageForBinding(new JBossQueue(queueName).getAddress(), new FilterImpl("JMSMessageID='" + messageId + "'"));
    }
 
-   public List<Message> listMessages(JBossTopic topic)
+   public void removeMessageFromTopic(String topicName, String messageId) throws Exception
    {
-      return listMessages(topic, ListType.ALL);
+      messagingServerManagement.removeMessageForAddress(new JBossTopic(topicName).getAddress(), new FilterImpl("JMSMessageID='" + messageId + "'"));
    }
 
-   public List<Message> listMessages(JBossQueue queue, ListType listType)
-   {
-      return null;  //todo
-   }
-
-   public List<Message> listMessages(JBossTopic topic, ListType listType)
-   {
-      return null;  //todo
-   }
-
    public void removeAllMessagesForQueue(String queueName) throws Exception
    {
       JBossQueue jBossQueue = new JBossQueue(queueName);
@@ -461,110 +463,42 @@
       removeAllMessages(jBossTopic);
    }
 
-   public void removeAllMessages(JBossQueue queue) throws Exception
+   public void moveMessage(String fromQueue, String toQueue, String messageId) throws Exception
    {
-      messagingServerManagement.removeAllMessagesForAddress(queue.getAddress());
+      messagingServerManagement.moveMessages(new JBossQueue(fromQueue).getAddress(), new JBossQueue(toQueue).getAddress(),
+              new FilterImpl("JMSMessageID='" + messageId + "'"));
    }
 
-   public void removeAllMessages(JBossTopic topic) throws Exception
-   {
-      messagingServerManagement.removeAllMessagesForAddress(topic.getAddress());
-   }
-
    public int getMessageCountForQueue(String queue) throws Exception
    {
       return getMessageCount(new JBossQueue(queue));
    }
 
-   public int getMessageCount(JBossQueue queue) throws Exception
-   {
-      return messagingServerManagement.getMessageCountForQueue(queue.getAddress());
-   }
-
-   public int getMessageCountForTopic(String topic) throws Exception
-   {
-      return 0;  //To change body of implemented methods use File | Settings | File Templates.
-   }
-
-   public int getMessageCount(JBossTopic topic) throws Exception
-   {
-      return 0;  //To change body of implemented methods use File | Settings | File Templates.
-   }
-
    public List<SubscriptionInfo> listSubscriptions(String topicName) throws Exception
    {
       return listSubscriptions(new JBossTopic(topicName));
    }
 
-   public List<SubscriptionInfo> listSubscriptions(JBossTopic topic) throws Exception
-   {
-      return listSubscriptions(topic, ListType.ALL);
-   }
-
    public List<SubscriptionInfo> listSubscriptions(String topic, ListType type) throws Exception
    {
       return listSubscriptions(new JBossTopic(topic), type);
    }
-   public List<SubscriptionInfo> listSubscriptions(JBossTopic topic, ListType type) throws Exception
-   {
-      List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
 
-      List<Queue> queues = messagingServerManagement.getQueuesForAddress(topic.getAddress());
-
-      for (Queue queue : queues)
-      {
-         if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
-         {
-            String subName = null;
-            String clientID = null;
-
-            if (queue.isDurable())
-            {
-               MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
-               subName = helper.getSubName();
-               clientID = helper.getClientId();
-            }
-
-            SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
-                    queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
-
-            subs.add(info);
-         }
-      }
-
-      return subs;
-   }
-
    public int getSubscriptionsCountForTopic(String topicName) throws Exception
    {
       return getSubscriptionsCount(new JBossTopic(topicName));
    }
 
-   public int getSubscriptionsCount(JBossTopic topic) throws Exception
-   {
-      return getSubscriptionsCount(topic, ListType.ALL);
-   }
-
    public int getSubscriptionsCountForTopic(String topicName, ListType listType) throws Exception
    {
       return getSubscriptionsCount(new JBossTopic(topicName), listType);
    }
 
-   public int getSubscriptionsCount(JBossTopic topic, ListType listType) throws Exception
-   {
-       return listSubscriptions(topic, listType).size();
-   }
-
    public int getConsumerCountForQueue(String queue) throws Exception
    {
-      return getConsumerCountForQueue(new JBossQueue(queue));
+      return getConsumerCount(new JBossQueue(queue));
    }
 
-   public int getConsumerCountForQueue(JBossQueue queue) throws Exception
-   {
-       return messagingServerManagement.getConsumerCountForQueue(queue.getName());
-   }
-
    public List<ClientInfo> getClients() throws Exception
    {
       List<ClientInfo> clientInfos = new ArrayList<ClientInfo>();
@@ -662,4 +596,96 @@
       destinations.get(destination).add(jndiBinding);
    }
 
+
+   private List<Message> listMessages(String queue, ListType listType) throws Exception
+   {
+      List<Message> messages = new ArrayList<Message>();
+      Filter filter = null;
+      switch (listType)
+      {
+         case DURABLE:
+            filter = new FilterImpl("JBMDurable='DURABLE'");
+            break;
+         case NON_DURABLE:
+            filter = new FilterImpl("JBMDurable='NON_DURABLE'");
+            break;
+      }
+      List<org.jboss.messaging.core.Message> messageList = messagingServerManagement.listMessages(queue, filter);
+      for (org.jboss.messaging.core.Message message : messageList)
+      {
+         messages.add(JBossMessage.createMessage(message, null));
+      }
+      return messages;
+   }
+
+
+   private void removeAllMessages(JBossQueue queue) throws Exception
+   {
+      messagingServerManagement.removeAllMessagesForAddress(queue.getAddress());
+   }
+
+   private void removeAllMessages(JBossTopic topic) throws Exception
+   {
+      messagingServerManagement.removeAllMessagesForAddress(topic.getAddress());
+   }
+
+   private int getMessageCount(JBossQueue queue) throws Exception
+   {
+      return messagingServerManagement.getMessageCountForQueue(queue.getAddress());
+   }
+
+   private int getMessageCount(JBossTopic topic) throws Exception
+   {
+      return 0;  //To change body of implemented methods use File | Settings | File Templates.
+   }
+
+   private List<SubscriptionInfo> listSubscriptions(JBossTopic topic) throws Exception
+   {
+      return listSubscriptions(topic, ListType.ALL);
+   }
+
+   private List<SubscriptionInfo> listSubscriptions(JBossTopic topic, ListType type) throws Exception
+   {
+      List<SubscriptionInfo> subs = new ArrayList<SubscriptionInfo>();
+
+      List<Queue> queues = messagingServerManagement.getQueuesForAddress(topic.getAddress());
+
+      for (Queue queue : queues)
+      {
+         if (type == ListType.ALL || (type == ListType.DURABLE && queue.isDurable()) || (type == ListType.NON_DURABLE && !queue.isDurable()))
+         {
+            String subName = null;
+            String clientID = null;
+
+            if (queue.isDurable())
+            {
+               MessageQueueNameHelper helper = MessageQueueNameHelper.createHelper(queue.getName());
+               subName = helper.getSubName();
+               clientID = helper.getClientId();
+            }
+
+            SubscriptionInfo info = new SubscriptionInfo(queue.getName(), queue.isDurable(), subName, clientID,
+                    queue.getFilter() == null ? null : queue.getFilter().getFilterString(), queue.getMessageCount(), queue.getMaxSize());
+
+            subs.add(info);
+         }
+      }
+
+      return subs;
+   }
+
+   private int getSubscriptionsCount(JBossTopic topic) throws Exception
+   {
+      return getSubscriptionsCount(topic, ListType.ALL);
+   }
+
+   public int getSubscriptionsCount(JBossTopic topic, ListType listType) throws Exception
+   {
+      return listSubscriptions(topic, listType).size();
+   }
+
+   public int getConsumerCount(JBossQueue queue) throws Exception
+   {
+      return messagingServerManagement.getConsumerCountForQueue(queue.getAddress());
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServer.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServer.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -102,4 +102,6 @@
    void removeAllMessagesForAddress(String address) throws Exception;
 
    void removeAllMessagesForBinding(String name) throws Exception;
+
+   void removeMessageForBinding(String name, Filter filter) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/MessagingServerManagement.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core;
 
 import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.jms.client.api.ClientConnectionFactory;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
 
@@ -58,6 +59,12 @@
 
    void removeAllMessagesForBinding(String name) throws Exception;
 
+   List<Message> listMessages(String queueName, Filter filter) throws Exception;
+
+   void removeMessageForBinding(String binding, Filter filter) throws Exception;
+
+   void removeMessageForAddress(String binding, Filter filter) throws Exception;
+
    int getMessageCountForQueue(String queue) throws Exception;
 
    void registerMessageCounter(String queueName) throws Exception;
@@ -87,4 +94,6 @@
    public int getConsumerCountForQueue(String queue) throws Exception;
 
    List<ServerConnectionEndpoint> getActiveConnections();
+
+   void moveMessages(String toQueue, String fromQueue, FilterImpl filter) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/PriorityLinkedList.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PriorityLinkedList.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/PriorityLinkedList.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -55,4 +55,6 @@
    ListIterator<T> iterator();
    
    boolean isEmpty();
+
+   void remove(T messageReference, int priority);
 }

Modified: trunk/src/main/org/jboss/messaging/core/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Queue.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/Queue.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -21,6 +21,8 @@
   */
 package org.jboss.messaging.core;
 
+import org.jboss.messaging.core.impl.filter.FilterImpl;
+
 import java.util.LinkedList;
 import java.util.List;
 
@@ -58,7 +60,11 @@
    List<MessageReference> list(Filter filter);
    
    void removeAllReferences();
-   
+
+   void removeReference(MessageReference messageReference);
+
+   List<MessageReference> removeReferences(Filter filter);
+
    long getPersistenceID();
    
    void setPersistenceID(long id);
@@ -110,4 +116,5 @@
    int getMessageCounterHistoryDayLimit();
    
    void setMessageCounterHistoryDayLimit(int limit);
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/impl/PriorityLinkedListImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/PriorityLinkedListImpl.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/impl/PriorityLinkedListImpl.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -169,7 +169,16 @@
    {
       return size == 0;
    }
-   
+
+   public void remove(T messageReference, int priority)
+   {
+      LinkedList linkedList = linkedLists.get(priority);
+      if(linkedList != null)
+      {
+         linkedList.remove(messageReference);
+      }
+   }
+
    public ListIterator<T> iterator()
    {
       return new PriorityLinkedListIterator();

Modified: trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/impl/QueueImpl.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -308,6 +308,32 @@
       }
    }
 
+   public synchronized void removeReference(MessageReference messageReference)
+   {
+      messageReferences.remove(messageReference , messageReference.getMessage().getPriority());
+      if (!this.scheduledTimeouts.isEmpty())
+      {
+         Set<Timeout> clone = new HashSet<Timeout>(scheduledTimeouts);
+
+         for (Timeout timeout: clone)
+         {
+            timeout.cancel();
+         }
+
+         scheduledTimeouts.clear();
+      }
+   }
+
+   public synchronized List<MessageReference> removeReferences(Filter filter)
+   {
+      List<MessageReference> allRefs = list(filter);
+      for (MessageReference messageReference : allRefs)
+      {
+         removeReference(messageReference);
+      }
+      return allRefs;
+   }
+
    public long getPersistenceID()
    {
       return id;

Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerImpl.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -422,6 +422,20 @@
       }
    }
 
+   public void removeMessageForBinding(String name, Filter filter) throws Exception
+   {
+      Binding binding = postOffice.getBinding(name);
+      if (binding != null)
+      {
+         Queue queue = binding.getQueue();
+         List<MessageReference> allRefs = queue.list(filter);
+         for (MessageReference messageReference : allRefs)
+         {
+            persistenceManager.deleteReference(messageReference);
+            queue.removeReference(messageReference);
+         }
+      }
+   }
 
    public SecurityStore getSecurityManager()
    {

Modified: trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/src/main/org/jboss/messaging/core/impl/server/MessagingServerManagementImpl.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -30,10 +30,12 @@
 import org.jboss.aop.microcontainer.aspects.jmx.JMX;
 import org.jboss.jms.client.api.ClientConnectionFactory;
 import org.jboss.jms.client.impl.ClientConnectionFactoryImpl;
+import org.jboss.jms.client.SelectorTranslator;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
 import org.jboss.messaging.core.*;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.impl.messagecounter.MessageCounter;
+import org.jboss.messaging.core.impl.filter.FilterImpl;
 import org.jboss.messaging.util.MessagingException;
 
 /**
@@ -115,6 +117,35 @@
       messagingServer.removeAllMessagesForBinding(name);
    }
 
+   public List<Message> listMessages(String queueName, Filter filter) throws Exception
+   {
+      List<Message> msgs = new ArrayList<Message>();
+      Queue queue = getQueue(queueName);
+      if(queue != null)
+      {
+         List<MessageReference> allRefs = queue.list(filter);
+         for (MessageReference allRef : allRefs)
+         {
+            msgs.add(allRef.getMessage());
+         }
+      }
+     return msgs;
+  }
+
+   public void removeMessageForBinding(String binding, Filter filter) throws Exception
+   {
+      messagingServer.removeMessageForBinding(binding, filter);
+   }
+
+   public void removeMessageForAddress(String binding, Filter filter) throws Exception
+   {
+      List<Binding> bindings = messagingServer.getPostOffice().getBindingsForAddress(binding);
+      for (Binding binding1 : bindings)
+      {
+         removeMessageForBinding(binding1.getQueue().getName(), filter);
+      }
+   }
+
    public List<Queue> getQueuesForAddress(String address) throws Exception
    {
       List<Queue> queues = new ArrayList<Queue>();
@@ -298,6 +329,18 @@
    {
       return messagingServer.getConnectionManager().getActiveConnections();
    }
+
+   public void moveMessages(String fromQueue, String toQueue, FilterImpl filter) throws Exception
+   {
+      Queue from = getQueue(fromQueue);
+      Queue to = getQueue(toQueue);
+      List<MessageReference> messageReferences = from.removeReferences(filter);
+      for (MessageReference messageReference : messageReferences)
+      {
+         to.addLast(messageReference);
+      }
+
+   }
 //
 ////   public int getDeliveringCountForQueue(String queue) throws Exception
 ////   {

Modified: trunk/tests/src/org/jboss/test/messaging/JBMServerTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/JBMServerTestCase.java	2008-02-18 20:05:24 UTC (rev 3733)
+++ trunk/tests/src/org/jboss/test/messaging/JBMServerTestCase.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -38,6 +38,7 @@
 
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.server.ConnectionManager;
+import org.jboss.jms.server.JMSServerManager;
 import org.jboss.jms.server.security.Role;
 import org.jboss.messaging.core.MessagingServer;
 import org.jboss.messaging.core.MessagingServerManagement;
@@ -322,6 +323,10 @@
       return servers.get(0).getMessagingServer();
    }
 
+   protected JMSServerManager getJmsServerManager() throws Exception
+   {
+      return servers.get(0).getJMSServerManager();
+   }
    /*protected void tearDown() throws Exception
    {
       super.tearDown();

Added: trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/server/JMSServerManagerTest.java	2008-02-19 16:52:32 UTC (rev 3734)
@@ -0,0 +1,557 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+package org.jboss.test.messaging.jms.server;
+
+import org.jboss.test.messaging.JBMServerTestCase;
+import org.jboss.jms.server.JMSServerManager;
+import org.jboss.jms.server.ClientInfo;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.messaging.core.impl.server.SubscriptionInfo;
+
+import javax.jms.*;
+import javax.naming.NameNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class JMSServerManagerTest extends JBMServerTestCase
+{
+   JMSServerManager jmsServerManager;
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      jmsServerManager = getJmsServerManager();
+   }
+
+   public void testIsStarted()
+   {
+      assertTrue(jmsServerManager.isStarted());
+   }
+
+
+   public void testCreateAndDestroyQueue() throws Exception
+   {
+      jmsServerManager.createQueue("anewtestqueue", "anewtestqueue");
+      Queue q = (Queue) getInitialContext().lookup("anewtestqueue");
+      assertNotNull(q);
+      jmsServerManager.destroyQueue("anewtestqueue");
+      try
+      {
+         getInitialContext().lookup("anewtestqueue");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+      jmsServerManager.createQueue("anewtestqueue", "/anewtestqueue");
+      q = (Queue) getInitialContext().lookup("/anewtestqueue");
+      assertNotNull(q);
+      jmsServerManager.destroyQueue("anewtestqueue");
+      try
+      {
+         getInitialContext().lookup("/anewtestqueue");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+
+      jmsServerManager.createQueue("anewtestqueue", "/queues/anewtestqueue");
+      getInitialContext().lookup("/queues/anewtestqueue");
+      assertNotNull(q);
+      jmsServerManager.destroyQueue("anewtestqueue");
+      try
+      {
+         getInitialContext().lookup("/queues/newtestqueue");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+
+      jmsServerManager.createQueue("anewtestqueue", "/queues/and/anewtestqueue");
+      q = (Queue) getInitialContext().lookup("/queues/and/anewtestqueue");
+      assertNotNull(q);
+      jmsServerManager.destroyQueue("anewtestqueue");
+      try
+      {
+         getInitialContext().lookup("/queues/and/anewtestqueue");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+   }
+
+   public void testCreateAndDestroyTopic() throws Exception
+   {
+      jmsServerManager.createTopic("anewtesttopic", "anewtesttopic");
+      Topic q = (Topic) getInitialContext().lookup("anewtesttopic");
+      assertNotNull(q);
+      jmsServerManager.destroyTopic("anewtesttopic");
+      try
+      {
+         q = (Topic) getInitialContext().lookup("anewtesttopic");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+      jmsServerManager.createTopic("anewtesttopic", "/anewtesttopic");
+      q = (Topic) getInitialContext().lookup("/anewtesttopic");
+      assertNotNull(q);
+      jmsServerManager.destroyTopic("anewtesttopic");
+      try
+      {
+         q = (Topic) getInitialContext().lookup("/anewtesttopic");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+
+      jmsServerManager.createTopic("anewtesttopic", "/topics/anewtesttopic");
+      q = (Topic) getInitialContext().lookup("/topics/anewtesttopic");
+      assertNotNull(q);
+      jmsServerManager.destroyTopic("anewtesttopic");
+      try
+      {
+         q = (Topic) getInitialContext().lookup("/topics/newtesttopic");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+
+      jmsServerManager.createTopic("anewtesttopic", "/topics/and/anewtesttopic");
+      q = (Topic) getInitialContext().lookup("/topics/and/anewtesttopic");
+      assertNotNull(q);
+      jmsServerManager.destroyTopic("anewtesttopic");
+      try
+      {
+         q = (Topic) getInitialContext().lookup("/topics/and/anewtesttopic");
+         fail("should throw eception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+   }
+
+   public void testCreateAndDestroyConectionFactory() throws Exception
+   {
+      jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 100, "newtestcf");
+      JBossConnectionFactory jbcf = (JBossConnectionFactory) getInitialContext().lookup("newtestcf");
+      assertNotNull(jbcf);
+      assertNotNull(jbcf.getDelegate());
+      jmsServerManager.destroyConnectionFactory("newtestcf");
+      try
+      {
+         getInitialContext().lookup("newtestcf");
+         fail("should throw exception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+      ArrayList<String> bindings = new ArrayList<String>();
+      bindings.add("oranewtestcf");
+      bindings.add("newtestcf");
+      jmsServerManager.createConnectionFactory("newtestcf", "anid", 100, true, 100, bindings);
+      jbcf = (JBossConnectionFactory) getInitialContext().lookup("newtestcf");
+      assertNotNull(jbcf);
+      assertNotNull(jbcf.getDelegate());
+      jbcf = (JBossConnectionFactory) getInitialContext().lookup("oranewtestcf");
+      assertNotNull(jbcf);
+      assertNotNull(jbcf.getDelegate());
+      jmsServerManager.destroyConnectionFactory("newtestcf");
+      try
+      {
+         getInitialContext().lookup("newtestcf");
+         fail("should throw exception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+      try
+      {
+         getInitialContext().lookup("oranewtestcf");
+         fail("should throw exception");
+      }
+      catch (NameNotFoundException e)
+      {
+         //pass
+      }
+   }
+
+   public void testClientInfo() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      List<ClientInfo> clientInfos = jmsServerManager.getClients();
+      assertNotNull(clientInfos);
+      assertEquals(1, clientInfos.size());
+      ClientInfo clientInfo = clientInfos.get(0);
+      assertEquals("guest", clientInfo.getUser());
+      assertEquals(ClientInfo.status.STOPPED, clientInfo.getStatus());
+      conn.start();
+      clientInfos = jmsServerManager.getClients();
+      assertNotNull(clientInfos);
+      assertEquals(1, clientInfos.size());
+      clientInfo = clientInfos.get(0);
+      assertEquals(ClientInfo.status.STARTED, clientInfo.getStatus());
+      clientInfo.getAddress();
+      clientInfo.getTimeCreated();
+      clientInfo.getAliveTime();
+      conn.close();
+      clientInfos = jmsServerManager.getClients();
+      assertNotNull(clientInfos);
+      assertEquals(0, clientInfos.size());
+      Connection conn2 = getConnectionFactory().createConnection("guest", "guest");
+      Connection conn3 = getConnectionFactory().createConnection("guest", "guest");
+      clientInfos = jmsServerManager.getClients();
+      assertNotNull(clientInfos);
+      assertEquals(2, clientInfos.size());
+      conn2.close();
+      clientInfos = jmsServerManager.getClients();
+      assertNotNull(clientInfos);
+      assertEquals(1, clientInfos.size());
+      conn3.close();
+      clientInfos = jmsServerManager.getClients();
+      assertNotNull(clientInfos);
+      assertEquals(0, clientInfos.size());
+   }
+
+   public void test() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      Session sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+      sess.createConsumer(queue1);
+      jmsServerManager.createQueue("Queue1", "binding");
+      jmsServerManager.getConsumerCountForQueue("Queue1");
+      sess.createConsumer(queue1);
+      sess.createConsumer(queue1);
+      sess.createConsumer(queue1);
+      sess.createConsumer(queue1);
+      sess.createConsumer(queue1);
+      assertEquals(jmsServerManager.getConsumerCountForQueue("Queue1"), 6);
+      conn.close();
+      assertEquals(jmsServerManager.getConsumerCountForQueue("Queue1"), 0);
+      conn = getConnectionFactory().createConnection("guest", "guest");
+      sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+      sess.createConsumer(topic1);
+      conn.close();
+   }
+
+   public void testListMessagesForQueue() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      try
+      {
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(queue1);
+         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            message.setIntProperty("count", i);
+            producer.send(message);
+         }
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         for (int i = 10; i < 20; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            message.setIntProperty("count", i);
+            producer.send(message);
+         }
+         List<Message> messageList = jmsServerManager.listMessagesForQueue("Queue1");
+         assertEquals(messageList.size(), 20);
+         for (int i = 0; i < messageList.size(); i++)
+         {
+            Message message = messageList.get(i);
+            assertEquals(message.getIntProperty("count"), i);
+            assertTrue(message instanceof TextMessage);
+         }
+         messageList = jmsServerManager.listMessagesForQueue("Queue1", JMSServerManager.ListType.NON_DURABLE);
+         assertEquals(messageList.size(), 10);
+         for (int i = 0; i < messageList.size(); i++)
+         {
+            Message message = messageList.get(i);
+            assertEquals(message.getIntProperty("count"), i);
+            assertTrue(message instanceof TextMessage);
+            assertTrue(message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT);
+         }
+         messageList = jmsServerManager.listMessagesForQueue("Queue1", JMSServerManager.ListType.DURABLE);
+         assertEquals(messageList.size(), 10);
+         for (int i = 10; i < messageList.size() + 10; i++)
+         {
+            Message message = messageList.get(i - 10);
+            assertEquals(message.getIntProperty("count"), i);
+            assertTrue(message instanceof TextMessage);
+            assertTrue(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testListMessagesForSubscription() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      try
+      {
+         String cid = "myclientid";
+         String id = "mysubid";
+         conn.setClientID(cid);
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         TopicSubscriber subscriber = sess.createDurableSubscriber(topic1, id);
+         MessageProducer producer = sess.createProducer(topic1);
+
+         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            message.setIntProperty("count", i);
+            producer.send(message);
+         }
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         for (int i = 10; i < 20; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            message.setIntProperty("count", i);
+            producer.send(message);
+         }
+
+         assertEquals(20, jmsServerManager.listMessagesForSubscription(cid + "." + id).size());
+         assertEquals(10, jmsServerManager.listMessagesForSubscription(cid + "." + id, JMSServerManager.ListType.DURABLE).size());
+         assertEquals(10, jmsServerManager.listMessagesForSubscription(cid + "." + id, JMSServerManager.ListType.NON_DURABLE).size());
+         subscriber.close();
+         sess.unsubscribe(id);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testRemoveMessageFromQueue() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      try
+      {
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(queue1);
+         Message messageToDelete = null;
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            message.setIntProperty("pos", i);
+            producer.send(message);
+            if (i == 5)
+            {
+               messageToDelete = message;
+            }
+         }
+         jmsServerManager.removeMessageFromQueue("Queue1", messageToDelete.getJMSMessageID());
+         sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = sess.createConsumer(queue1);
+         conn.start();
+         int lastPos = -1;
+         for (int i = 0; i < 9; i++)
+         {
+            Message message = consumer.receive();
+            assertNotSame(messageToDelete.getJMSMessageID(), message.getJMSMessageID());
+            int pos = message.getIntProperty("pos");
+            assertTrue("returned in wrong order", pos > lastPos);
+            lastPos = pos;
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testRemoveMessageFromTopic() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      try
+      {
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(topic1);
+         MessageConsumer consumer = sess.createConsumer(topic1);
+         MessageConsumer consumer2 = sess.createConsumer(topic1);
+         Message messageToDelete = null;
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            producer.send(message);
+            if (i == 5)
+            {
+               messageToDelete = message;
+            }
+         }
+         jmsServerManager.removeMessageFromTopic("Topic1", messageToDelete.getJMSMessageID());
+         conn.start();
+         for (int i = 0; i < 9; i++)
+         {
+            Message message = consumer.receive();
+            assertNotSame(messageToDelete.getJMSMessageID(), message.getJMSMessageID());
+            message = consumer2.receive();
+            assertNotSame(messageToDelete.getJMSMessageID(), message.getJMSMessageID());
+         }
+      }
+      finally
+      {
+         if(conn != null)
+         {
+            conn.close();
+         }
+      }
+
+   }
+
+   public void testRemoveAllMessagesFromQueue() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      try
+      {
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(queue1);
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            producer.send(message);
+         }
+         jmsServerManager.removeAllMessagesForQueue("Queue1");
+         sess = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = sess.createConsumer(queue1);
+         assertEquals("messages still exist", 0, jmsServerManager.getMessageCountForQueue("Queue1"));
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testRemoveAllMessagesFromTopic() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      try
+      {
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(topic1);
+         MessageConsumer consumer = sess.createConsumer(topic1);
+         MessageConsumer consumer2 = sess.createConsumer(topic1);
+         Message messageToDelete = null;
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            producer.send(message);
+            if (i == 5)
+            {
+               messageToDelete = message;
+            }
+         }
+         jmsServerManager.removeAllMessagesForTopic("Topic1");
+         List<SubscriptionInfo> subscriptionInfos = jmsServerManager.listSubscriptions("Topic1");
+         for (SubscriptionInfo subscriptionInfo : subscriptionInfos)
+         {
+            assertEquals(0, jmsServerManager.listMessagesForSubscription(subscriptionInfo.getId()).size());
+         }
+      }
+      finally
+      {
+         if(conn != null)
+         {
+            conn.close();
+         }
+      }
+
+   }
+
+   public void testMoveMessage() throws Exception
+   {
+      Connection conn = getConnectionFactory().createConnection("guest", "guest");
+      try
+      {
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(queue1);
+         Message messageToMove = null;
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage message = sess.createTextMessage();
+            producer.send(message);
+            if (i == 5)
+            {
+               messageToMove = message;
+            }
+         }
+         jmsServerManager.moveMessage("Queue1", "Queue2", messageToMove.getJMSMessageID());
+         MessageConsumer consumer = sess.createConsumer(queue1);
+         conn.start();
+         for (int i = 0; i < 9; i++)
+         {
+            Message message = consumer.receive();
+            assertNotSame(messageToMove.getJMSMessageID(), message.getJMSMessageID());
+         }
+         consumer.close();
+         consumer = sess.createConsumer(queue2);
+         Message message = consumer.receive();
+         assertEquals(messageToMove.getJMSMessageID(), message.getJMSMessageID());
+      }
+      finally
+      {
+         if(conn != null)
+         {
+            conn.close();
+         }
+      }
+
+   }
+}




More information about the jboss-cvs-commits mailing list