[jboss-cvs] JBoss Messaging SVN: r4763 - in branches/Branch_JBMESSAGING-1303: src/main/org/jboss/messaging/core/management/impl and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Aug 1 09:28:18 EDT 2008
Author: jmesnil
Date: 2008-08-01 09:28:18 -0400 (Fri, 01 Aug 2008)
New Revision: 4763
Modified:
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added many operations and attributes to JMSQueueControlMBean
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/MessagingServerManagement.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -26,7 +26,9 @@
import java.util.Set;
import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.security.Role;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
@@ -98,6 +100,14 @@
List<Queue> getQueuesForAddress(SimpleString address) throws Exception;
/**
+ * @param messageID
+ * @param simpleAddress
+ * @return
+ * @throws Exception
+ */
+ boolean removeMessageFromAddress(long messageID, SimpleString address) throws Exception;
+
+ /**
* remove all the messages for a specific address
* @param address the address
* @throws Exception if a problem occurred
@@ -290,5 +300,43 @@
*/
String getVersion();
+ /**
+ * @param messageID
+ * @param address
+ * @return
+ * @throws Exception
+ */
+ boolean expireMessage(long messageID, SimpleString address) throws Exception;
+ /**
+ * @param filter
+ * @param address
+ * @return
+ * @throws Exception
+ */
+ List<MessageReference> expireMessages(Filter filter,
+ SimpleString address) throws Exception;
+
+ /**
+ * @param simpleAddress
+ * @return
+ */
+ QueueSettings getQueueSettings(SimpleString simpleAddress);
+
+ /**
+ * @param filter
+ * @param simpleAddress
+ * @return
+ */
+ List<MessageReference> sendMessagesToDLQ(Filter filter,
+ SimpleString simpleAddress) throws Exception;
+
+ /**
+ * @param filter
+ * @param newPriority
+ * @param simpleAddress
+ * @return
+ */
+ List<MessageReference> changeMessagesPriority(Filter filter,
+ byte newPriority, SimpleString address) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -85,7 +85,7 @@
@Parameter(name = "messageID", desc = "A message ID") long messageID)
throws Exception;
- @Operation(desc = "Remove the message corresponding to the given filter (and returns the number of expired messages)", impact = ACTION)
+ @Operation(desc = "Remove the messages corresponding to the given filter (and returns the number of expired messages)", impact = ACTION)
int expireMessages(
@Parameter(name = "filter", desc = "A message filter") String filter)
throws Exception;
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/MessagingServerManagementImpl.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -18,11 +18,12 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.management.impl;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -35,6 +36,7 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.ConnectionManager;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -42,63 +44,64 @@
import org.jboss.messaging.util.SimpleString;
/**
- * This interface describes the properties and operations that comprise the management interface of the
- * Messaging Server.
- * <p/>
- * It includes operations to create and destroy queues and provides various statistics measures
- * such as message count for queues and topics.
- *
+ * This interface describes the properties and operations that comprise the
+ * management interface of the Messaging Server. <p/> It includes operations to
+ * create and destroy queues and provides various statistics measures such as
+ * message count for queues and topics.
+ *
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
* @author <a href="mailto:ataylor at redhat.com>Andy Taylor</a>
*/
-//@JMX(name = "jboss.messaging:service=MessagingServerManagement", exposedInterface = MessagingServerManagement.class)
+// @JMX(name = "jboss.messaging:service=MessagingServerManagement",
+// exposedInterface = MessagingServerManagement.class)
public class MessagingServerManagementImpl implements MessagingServerManagement
{
- //private MessagingServer messagingServer;
+ // private MessagingServer messagingServer;
-// private HashMap<String, MessageCounter> currentCounters = new HashMap<String, MessageCounter>();
-//
-// private HashMap<String, ScheduledFuture> currentRunningCounters = new HashMap<String, ScheduledFuture>();
-//
-// private ScheduledExecutorService scheduler;
-//
-// private int maxMessageCounters = 20;
-
+ // private HashMap<String, MessageCounter> currentCounters = new
+ // HashMap<String, MessageCounter>();
+ //
+ // private HashMap<String, ScheduledFuture> currentRunningCounters = new
+ // HashMap<String, ScheduledFuture>();
+ //
+ // private ScheduledExecutorService scheduler;
+ //
+ // private int maxMessageCounters = 20;
+
private final PostOffice postOffice;
-
+
private final StorageManager storageManager;
-
+
private final Configuration configuration;
-
+
private final ConnectionManager connectionManager;
-
+
private final MessagingServer server;
-
+
private HierarchicalRepository<Set<Role>> securityRepository;
-
+
private HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
-
-
- public MessagingServerManagementImpl(final PostOffice postOffice, final StorageManager storageManager,
- final Configuration configuration,
- final ConnectionManager connectionManager,
- final HierarchicalRepository<Set<Role>> securityRepository,
- final HierarchicalRepository<QueueSettings> queueSettingsRepository,
- final MessagingServer server)
+
+ public MessagingServerManagementImpl(final PostOffice postOffice,
+ final StorageManager storageManager,
+ final Configuration configuration,
+ final ConnectionManager connectionManager,
+ final HierarchicalRepository<Set<Role>> securityRepository,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository,
+ final MessagingServer server)
{
this.postOffice = postOffice;
-
+
this.storageManager = storageManager;
-
+
this.configuration = configuration;
-
+
this.connectionManager = connectionManager;
-
+
this.server = server;
-
+
this.securityRepository = securityRepository;
-
+
this.queueSettingsRepository = queueSettingsRepository;
}
@@ -112,15 +115,18 @@
return server.getVersion().getFullVersion();
}
- public void createQueue(SimpleString address, SimpleString name) throws Exception
+ public void createQueue(SimpleString address, SimpleString name)
+ throws Exception
{
if (postOffice.getBinding(name) == null)
{
postOffice.addBinding(address, name, null, true, false);
}
}
-
- public void createQueue(SimpleString address, SimpleString name, SimpleString filterStr, boolean durable, boolean temporary) throws Exception
+
+ public void createQueue(SimpleString address, SimpleString name,
+ SimpleString filterStr, boolean durable, boolean temporary)
+ throws Exception
{
if (postOffice.getBinding(name) == null)
{
@@ -132,7 +138,7 @@
postOffice.addBinding(address, name, filter, durable, temporary);
}
}
-
+
public int getConnectionCount()
{
return connectionManager.size();
@@ -147,7 +153,7 @@
Queue queue = binding.getQueue();
queue.deleteAllReferences(storageManager);
-
+
postOffice.removeBinding(queue.getName());
}
}
@@ -162,7 +168,8 @@
return postOffice.removeDestination(address, false);
}
- public void removeAllMessagesForAddress(SimpleString address) throws Exception
+ public void removeAllMessagesForAddress(SimpleString address)
+ throws Exception
{
List<Binding> bindings = postOffice.getBindingsForAddress(address);
@@ -173,59 +180,164 @@
queue.deleteAllReferences(storageManager);
}
}
-//
-// public void removeAllMessagesForBinding(SimpleString name) throws Exception
-// {
-// Binding binding = messagingServer.getPostOffice().getBinding(name);
-// if (binding != null)
-// {
-// Queue queue = binding.getQueue();
-//
-// queue.deleteAllReferences(messagingServer.getStorageManager());
-// }
-// }
-//
-// public List<Message> listMessages(SimpleString queueName, Filter filter) throws Exception
-// {
-// List<Message> msgs = new ArrayList<Message>();
-// Queue queue = getQueue(queueName);
-// if (queue != null)
-// {
-// List<MessageReference> allRefs = queue.list(filter);
-// for (MessageReference allRef : allRefs)
-// {
-// msgs.add(allRef.getMessage());
-// }
-// }
-// return msgs;
-// }
-// public void removeMessageForBinding(String name, Filter filter) throws Exception
-// {
-// Binding binding = messagingServer.getPostOffice().getBinding(name);
-// if (binding != null)
-// {
-// Queue queue = binding.getQueue();
-// List<MessageReference> allRefs = queue.list(filter);
-// for (MessageReference messageReference : allRefs)
-// {
-// messagingServer.getPersistenceManager().deleteReference(messageReference);
-// queue.removeReference(messageReference);
-// }
-// }
-// }
+ public boolean removeMessageFromAddress(long messageID, SimpleString address)
+ throws Exception
+ {
+ Binding binding = postOffice.getBinding(address);
+ if (binding != null)
+ {
+ Queue queue = binding.getQueue();
+ return queue.deleteReference(messageID, storageManager);
+ }
+ return false;
+ }
-// public void removeMessageForAddress(String binding, Filter filter) throws Exception
-// {
-// List<Binding> bindings = messagingServer.getPostOffice().getBindingsForAddress(binding);
-// for (Binding binding1 : bindings)
-// {
-// removeMessageForBinding(binding1.getQueue().getName(), filter);
-// }
-// }
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.jboss.messaging.core.management.MessagingServerManagement#expireMessage
+ * (long, org.jboss.messaging.util.SimpleString)
+ */
+ public boolean expireMessage(long messageID, SimpleString address)
+ throws Exception
+ {
+ Binding binding = postOffice.getBinding(address);
+ if (binding != null)
+ {
+ Queue queue = binding.getQueue();
+ return queue.expireMessage(messageID, storageManager, postOffice,
+ queueSettingsRepository);
+ }
+ return false;
+ }
- public List<Queue> getQueuesForAddress(SimpleString address) throws Exception
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.jboss.messaging.core.management.MessagingServerManagement#expireMessages
+ * (org.jboss.messaging.core.filter.Filter,
+ * org.jboss.messaging.util.SimpleString)
+ */
+ public List<MessageReference> expireMessages(Filter filter,
+ SimpleString address) throws Exception
{
+ Binding binding = postOffice.getBinding(address);
+ if (binding != null)
+ {
+ Queue queue = binding.getQueue();
+ List<MessageReference> refs = queue.list(filter);
+ for (MessageReference ref : refs)
+ {
+ queue.expireMessage(ref.getMessage().getMessageID(),
+ storageManager, postOffice, queueSettingsRepository);
+ }
+ return refs;
+ }
+ return Collections.emptyList();
+ }
+
+ public List<MessageReference> sendMessagesToDLQ(Filter filter,
+ SimpleString address) throws Exception
+ {
+ Binding binding = postOffice.getBinding(address);
+ if (binding != null)
+ {
+ Queue queue = binding.getQueue();
+ List<MessageReference> refs = queue.list(filter);
+ for (MessageReference ref : refs)
+ {
+ queue.sendMessageToDLQ(ref.getMessage().getMessageID(),
+ storageManager, postOffice, queueSettingsRepository);
+ }
+ return refs;
+ }
+ return Collections.emptyList();
+ }
+
+ public List<MessageReference> changeMessagesPriority(Filter filter,
+ byte newPriority, SimpleString address) throws Exception
+ {
+ Binding binding = postOffice.getBinding(address);
+ if (binding != null)
+ {
+ Queue queue = binding.getQueue();
+ List<MessageReference> refs = queue.list(filter);
+ for (MessageReference ref : refs)
+ {
+ queue.changeMessagePriority(ref.getMessage().getMessageID(), newPriority, storageManager, postOffice, queueSettingsRepository);
+ }
+ return refs;
+ }
+ return Collections.emptyList();
+ }
+
+ public QueueSettings getQueueSettings(SimpleString simpleAddress)
+ {
+ return queueSettingsRepository.getMatch(simpleAddress.toString());
+ }
+
+ //
+ // public void removeAllMessagesForBinding(SimpleString name) throws
+ // Exception
+ // {
+ // Binding binding = messagingServer.getPostOffice().getBinding(name);
+ // if (binding != null)
+ // {
+ // Queue queue = binding.getQueue();
+ //
+ // queue.deleteAllReferences(messagingServer.getStorageManager());
+ // }
+ // }
+ //
+ // public List<Message> listMessages(SimpleString queueName, Filter filter)
+ // throws Exception
+ // {
+ // List<Message> msgs = new ArrayList<Message>();
+ // Queue queue = getQueue(queueName);
+ // if (queue != null)
+ // {
+ // List<MessageReference> allRefs = queue.list(filter);
+ // for (MessageReference allRef : allRefs)
+ // {
+ // msgs.add(allRef.getMessage());
+ // }
+ // }
+ // return msgs;
+ // }
+
+ // public void removeMessageForBinding(String name, Filter filter) throws
+ // Exception
+ // {
+ // Binding binding = messagingServer.getPostOffice().getBinding(name);
+ // if (binding != null)
+ // {
+ // Queue queue = binding.getQueue();
+ // List<MessageReference> allRefs = queue.list(filter);
+ // for (MessageReference messageReference : allRefs)
+ // {
+ // messagingServer.getPersistenceManager().deleteReference(messageReference);
+ // queue.removeReference(messageReference);
+ // }
+ // }
+ // }
+
+ // public void removeMessageForAddress(String binding, Filter filter) throws
+ // Exception
+ // {
+ // List<Binding> bindings =
+ // messagingServer.getPostOffice().getBindingsForAddress(binding);
+ // for (Binding binding1 : bindings)
+ // {
+ // removeMessageForBinding(binding1.getQueue().getName(), filter);
+ // }
+ // }
+
+ public List<Queue> getQueuesForAddress(SimpleString address)
+ throws Exception
+ {
List<Queue> queues = new ArrayList<Queue>();
List<Binding> bindings = postOffice.getBindingsForAddress(address);
@@ -241,245 +353,260 @@
{
return getQueue(queue).getMessageCount();
}
-
- public void setSecurityForAddress(String address, Set<Role> roles) throws Exception
+
+ public void setSecurityForAddress(String address, Set<Role> roles)
+ throws Exception
{
this.securityRepository.addMatch(address, roles);
}
-
+
public void removeSecurityForAddress(String address) throws Exception
{
this.securityRepository.removeMatch(address);
}
-
+
public Set<Role> getSecurityForAddress(String address) throws Exception
{
return this.securityRepository.getMatch(address);
}
-
- public void setQueueAttributes(String queueName, QueueSettings settings) throws Exception
+
+ public void setQueueAttributes(String queueName, QueueSettings settings)
+ throws Exception
{
this.queueSettingsRepository.addMatch(queueName, settings);
}
-
-
-//
-// public int getMaxMessageCounters()
-// {
-// return maxMessageCounters;
-// }
-//
-// public void setMaxMessageCounters(int maxMessageCounters)
-// {
-// this.maxMessageCounters = maxMessageCounters;
-// }
-//
-// public void registerMessageCounter(final SimpleString queueName) throws Exception
-// {
-// if (currentCounters.get(queueName) != null)
-// {
-// throw new IllegalStateException("Message Counter Already Registered");
-// }
-// Binding binding = messagingServer.getPostOffice().getBinding(queueName);
-// if (binding == null)
-// {
-// throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
-// }
-// Queue queue = binding.getQueue();
-// currentCounters.put(queueName, new MessageCounter(queue.getName(), queue, queue.isDurable(),
-// messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).getMessageCounterHistoryDayLimit()));
-// }
-//
-// public void unregisterMessageCounter(final SimpleString queueName) throws Exception
-// {
-// if (currentCounters.get(queueName) == null)
-// {
-// throw new MessagingException(MessagingException.ILLEGAL_STATE, "Counter is not registered");
-// }
-// currentCounters.remove(queueName);
-// if (currentRunningCounters.get(queueName) != null)
-// {
-// currentRunningCounters.get(queueName).cancel(true);
-// currentRunningCounters.remove(queueName);
-// }
-// }
-//
-// public void startMessageCounter(final String SimpleString, long duration) throws Exception
-// {
-// MessageCounter messageCounter = currentCounters.get(queueName);
-// if (messageCounter == null)
-// {
-// Binding binding = messagingServer.getPostOffice().getBinding(queueName);
-// if (binding == null)
-// {
-// throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
-// }
-// Queue queue = binding.getQueue();
-// messageCounter = new MessageCounter(queue.getName(), queue, queue.isDurable(),
-// messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).getMessageCounterHistoryDayLimit());
-// }
-// currentCounters.put(queueName, messageCounter);
-// messageCounter.resetCounter();
-// if (duration > 0)
-// {
-//
-// ScheduledFuture future = scheduler.schedule(new Runnable()
-// {
-// public void run()
-// {
-// currentCounters.get(queueName).sample();
-// }
-// }, duration, TimeUnit.SECONDS);
-// currentRunningCounters.put(queueName, future);
-// }
-// }
-//
-// public MessageCounter stopMessageCounter(SimpleString queueName) throws Exception
-// {
-// MessageCounter messageCounter = currentCounters.get(queueName);
-// if (messageCounter == null)
-// {
-// throw new IllegalArgumentException(queueName + "counter not registered");
-// }
-// if (currentRunningCounters.get(queueName) != null)
-// {
-// currentRunningCounters.get(queueName).cancel(true);
-// currentRunningCounters.remove(queueName);
-// }
-// messageCounter.sample();
-// return messageCounter;
-// }
-//
-// public MessageCounter getMessageCounter(SimpleString queueName)
-// {
-// MessageCounter messageCounter = currentCounters.get(queueName);
-// if (messageCounter != null && currentRunningCounters.get(queueName) == null)
-// {
-// messageCounter.sample();
-// }
-// return messageCounter;
-// }
-//
-//
-// public Collection<MessageCounter> getMessageCounters()
-// {
-// for (String s : currentCounters.keySet())
-// {
-// currentCounters.get(s).sample();
-// }
-// return currentCounters.values();
-// }
-//
-// public void resetMessageCounter(SimpleString queue)
-// {
-// MessageCounter messageCounter = currentCounters.get(queue);
-// if (messageCounter != null)
-// {
-// messageCounter.resetCounter();
-// }
-// }
-//
-// public void resetMessageCounters()
-// {
-// Set<String> counterNames = currentCounters.keySet();
-// for (String counterName : counterNames)
-// {
-// resetMessageCounter(counterName);
-// }
-// }
-//
-// public void resetMessageCounterHistory(SimpleString queue)
-// {
-// MessageCounter messageCounter = currentCounters.get(queue);
-// if (messageCounter != null)
-// {
-// messageCounter.resetHistory();
-// }
-// }
-//
-// public void resetMessageCounterHistories()
-// {
-// Set<String> counterNames = currentCounters.keySet();
-// for (String counterName : counterNames)
-// {
-// resetMessageCounterHistory(counterName);
-// }
-// }
-//
-// public List<MessageCounter> stopAllMessageCounters() throws Exception
-// {
-// Set<String> counterNames = currentCounters.keySet();
-// List<MessageCounter> messageCounters = new ArrayList<MessageCounter>();
-// for (String counterName : counterNames)
-// {
-// messageCounters.add(stopMessageCounter(counterName));
-// }
-// return messageCounters;
-// }
-//
-// public void unregisterAllMessageCounters() throws Exception
-// {
-// Set<String> counterNames = currentCounters.keySet();
-// for (String counterName : counterNames)
-// {
-// unregisterMessageCounter(counterName);
-// }
-// }
-//
-// public int getConsumerCountForQueue(SimpleString queue) throws Exception
-// {
-// return getQueue(queue).getConsumerCount();
-// }
-//
-// public List<ServerConnection> getActiveConnections()
-// {
-// return messagingServer.getConnectionManager().getActiveConnections();
-// }
-// public void moveMessages(String fromQueue, String toQueue, String filter) throws Exception
-// {
-// Filter actFilter = new FilterImpl(filter);
-// Queue from = getQueue(fromQueue);
-// Queue to = getQueue(toQueue);
-// List<MessageReference> messageReferences = from.list(actFilter);
-// for (MessageReference messageReference : messageReferences)
-// {
-// from.move(messageReference, to, messagingServer.getPersistenceManager());
-// }
-//
-// }
-//
-// public void expireMessages(SimpleString queue, SimpleString filter) throws Exception
-// {
-// Filter actFilter = new FilterImpl(filter);
-// List<MessageReference> allRefs = getQueue(queue).list(actFilter);
-// for (MessageReference messageReference : allRefs)
-// {
-// messageReference.getMessage().setExpiration(System.currentTimeMillis());
-// }
-// }
+ //
+ // public int getMaxMessageCounters()
+ // {
+ // return maxMessageCounters;
+ // }
+ //
+ // public void setMaxMessageCounters(int maxMessageCounters)
+ // {
+ // this.maxMessageCounters = maxMessageCounters;
+ // }
+ //
+ // public void registerMessageCounter(final SimpleString queueName) throws
+ // Exception
+ // {
+ // if (currentCounters.get(queueName) != null)
+ // {
+ // throw new IllegalStateException("Message Counter Already Registered");
+ // }
+ // Binding binding = messagingServer.getPostOffice().getBinding(queueName);
+ // if (binding == null)
+ // {
+ // throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ // }
+ // Queue queue = binding.getQueue();
+ // currentCounters.put(queueName, new MessageCounter(queue.getName(), queue,
+ // queue.isDurable(),
+ // messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).
+ // getMessageCounterHistoryDayLimit()));
+ // }
+ //
+ // public void unregisterMessageCounter(final SimpleString queueName) throws
+ // Exception
+ // {
+ // if (currentCounters.get(queueName) == null)
+ // {
+ // throw new MessagingException(MessagingException.ILLEGAL_STATE,
+ // "Counter is not registered");
+ // }
+ // currentCounters.remove(queueName);
+ // if (currentRunningCounters.get(queueName) != null)
+ // {
+ // currentRunningCounters.get(queueName).cancel(true);
+ // currentRunningCounters.remove(queueName);
+ // }
+ // }
+ //
+ // public void startMessageCounter(final String SimpleString, long duration)
+ // throws Exception
+ // {
+ // MessageCounter messageCounter = currentCounters.get(queueName);
+ // if (messageCounter == null)
+ // {
+ // Binding binding = messagingServer.getPostOffice().getBinding(queueName);
+ // if (binding == null)
+ // {
+ // throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
+ // }
+ // Queue queue = binding.getQueue();
+ // messageCounter = new MessageCounter(queue.getName(), queue,
+ // queue.isDurable(),
+ // messagingServer.getQueueSettingsRepository().getMatch(queue.getName()).
+ // getMessageCounterHistoryDayLimit());
+ // }
+ // currentCounters.put(queueName, messageCounter);
+ // messageCounter.resetCounter();
+ // if (duration > 0)
+ // {
+ //
+ // ScheduledFuture future = scheduler.schedule(new Runnable()
+ // {
+ // public void run()
+ // {
+ // currentCounters.get(queueName).sample();
+ // }
+ // }, duration, TimeUnit.SECONDS);
+ // currentRunningCounters.put(queueName, future);
+ // }
+ // }
+ //
+ // public MessageCounter stopMessageCounter(SimpleString queueName) throws
+ // Exception
+ // {
+ // MessageCounter messageCounter = currentCounters.get(queueName);
+ // if (messageCounter == null)
+ // {
+ // throw new IllegalArgumentException(queueName + "counter not registered");
+ // }
+ // if (currentRunningCounters.get(queueName) != null)
+ // {
+ // currentRunningCounters.get(queueName).cancel(true);
+ // currentRunningCounters.remove(queueName);
+ // }
+ // messageCounter.sample();
+ // return messageCounter;
+ // }
+ //
+ // public MessageCounter getMessageCounter(SimpleString queueName)
+ // {
+ // MessageCounter messageCounter = currentCounters.get(queueName);
+ // if (messageCounter != null && currentRunningCounters.get(queueName) ==
+ // null)
+ // {
+ // messageCounter.sample();
+ // }
+ // return messageCounter;
+ // }
+ //
+ //
+ // public Collection<MessageCounter> getMessageCounters()
+ // {
+ // for (String s : currentCounters.keySet())
+ // {
+ // currentCounters.get(s).sample();
+ // }
+ // return currentCounters.values();
+ // }
+ //
+ // public void resetMessageCounter(SimpleString queue)
+ // {
+ // MessageCounter messageCounter = currentCounters.get(queue);
+ // if (messageCounter != null)
+ // {
+ // messageCounter.resetCounter();
+ // }
+ // }
+ //
+ // public void resetMessageCounters()
+ // {
+ // Set<String> counterNames = currentCounters.keySet();
+ // for (String counterName : counterNames)
+ // {
+ // resetMessageCounter(counterName);
+ // }
+ // }
+ //
+ // public void resetMessageCounterHistory(SimpleString queue)
+ // {
+ // MessageCounter messageCounter = currentCounters.get(queue);
+ // if (messageCounter != null)
+ // {
+ // messageCounter.resetHistory();
+ // }
+ // }
+ //
+ // public void resetMessageCounterHistories()
+ // {
+ // Set<String> counterNames = currentCounters.keySet();
+ // for (String counterName : counterNames)
+ // {
+ // resetMessageCounterHistory(counterName);
+ // }
+ // }
+ //
+ // public List<MessageCounter> stopAllMessageCounters() throws Exception
+ // {
+ // Set<String> counterNames = currentCounters.keySet();
+ // List<MessageCounter> messageCounters = new ArrayList<MessageCounter>();
+ // for (String counterName : counterNames)
+ // {
+ // messageCounters.add(stopMessageCounter(counterName));
+ // }
+ // return messageCounters;
+ // }
+ //
+ // public void unregisterAllMessageCounters() throws Exception
+ // {
+ // Set<String> counterNames = currentCounters.keySet();
+ // for (String counterName : counterNames)
+ // {
+ // unregisterMessageCounter(counterName);
+ // }
+ // }
+ //
+ // public int getConsumerCountForQueue(SimpleString queue) throws Exception
+ // {
+ // return getQueue(queue).getConsumerCount();
+ // }
+ //
+ // public List<ServerConnection> getActiveConnections()
+ // {
+ // return messagingServer.getConnectionManager().getActiveConnections();
+ // }
-// public void changeMessagePriority(String queue, String filter, int priority) throws Exception
-// {
-// Filter actFilter = new FilterImpl(filter);
-// List<MessageReference> allRefs = getQueue(queue).list(actFilter);
-// for (MessageReference messageReference : allRefs)
-// {
-// List<MessageReference> allRefsForMessage = messageReference.getMessage().getReferences();
-// for (MessageReference reference : allRefsForMessage)
-// {
-// reference.getQueue().changePriority(reference, priority);
-// }
-// messageReference.getMessage().setPriority((byte) priority);
-// }
-//
-// }
-//
-// public Set<SimpleString> listAvailableAddresses()
-// {
-// return messagingServer.getPostOffice().listAllDestinations();
-// }
+ // public void moveMessages(String fromQueue, String toQueue, String filter)
+ // throws Exception
+ // {
+ // Filter actFilter = new FilterImpl(filter);
+ // Queue from = getQueue(fromQueue);
+ // Queue to = getQueue(toQueue);
+ // List<MessageReference> messageReferences = from.list(actFilter);
+ // for (MessageReference messageReference : messageReferences)
+ // {
+ // from.move(messageReference, to, messagingServer.getPersistenceManager());
+ // }
+ //
+ // }
+ //
+ // public void expireMessages(SimpleString queue, SimpleString filter) throws
+ // Exception
+ // {
+ // Filter actFilter = new FilterImpl(filter);
+ // List<MessageReference> allRefs = getQueue(queue).list(actFilter);
+ // for (MessageReference messageReference : allRefs)
+ // {
+ // messageReference.getMessage().setExpiration(System.currentTimeMillis());
+ // }
+ // }
+ // public void changeMessagePriority(String queue, String filter, int
+ // priority) throws Exception
+ // {
+ // Filter actFilter = new FilterImpl(filter);
+ // List<MessageReference> allRefs = getQueue(queue).list(actFilter);
+ // for (MessageReference messageReference : allRefs)
+ // {
+ // List<MessageReference> allRefsForMessage =
+ // messageReference.getMessage().getReferences();
+ // for (MessageReference reference : allRefsForMessage)
+ // {
+ // reference.getQueue().changePriority(reference, priority);
+ // }
+ // messageReference.getMessage().setPriority((byte) priority);
+ // }
+ //
+ // }
+ //
+ // public Set<SimpleString> listAvailableAddresses()
+ // {
+ // return messagingServer.getPostOffice().listAllDestinations();
+ // }
+
public Configuration getConfiguration()
{
return configuration;
@@ -496,28 +623,26 @@
return binding.getQueue();
}
- // Private ---------------------------------------------------------------------------
+ // Private
+ // --------------------------------------------------------------------
+ // -------
+ // public void start() throws Exception
+ // {
+ // //scheduler = Executors.newScheduledThreadPool(maxMessageCounters);
+ // }
+ //
+ // public void stop() throws Exception
+ // {
+ // // if (scheduler != null)
+ // // {
+ // // scheduler.shutdown();
+ // // }
+ // }
-
-
-
-// public void start() throws Exception
-// {
-// //scheduler = Executors.newScheduledThreadPool(maxMessageCounters);
-// }
-//
-// public void stop() throws Exception
-// {
-//// if (scheduler != null)
-//// {
-//// scheduler.shutdown();
-//// }
-// }
-
-// protected void finalize() throws Throwable
-// {
-// super.finalize();
-//
-// }
+ // protected void finalize() throws Throwable
+ // {
+ // super.finalize();
+ //
+ // }
}
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -269,21 +269,7 @@
throw new IllegalArgumentException("invalid newPriority value: "
+ newPriority + ". It must be between 0 and 9 (both included)");
}
- List<MessageReference> refs = queue.list(null);
- for (MessageReference ref : refs)
- {
- ServerMessage message = ref.getMessage();
- if (message.getMessageID() == messageID)
- {
- message.setPriority((byte) newPriority);
- // delete and add the reference so that it
- // goes to the right queues for the new priority
- queue.deleteReference(messageID, storageManager);
- queue.addLast(ref);
- return true;
- }
- }
- return false;
+ return queue.changeMessagePriority(messageID, (byte) newPriority, storageManager, postOffice, queueSettingsRepository);
}
// StandardMBean overrides ---------------------------------------
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -131,6 +131,11 @@
PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository)
throws Exception;
+
+ boolean changeMessagePriority(long messageID, byte newPriority, StorageManager storageManager,
+ PostOffice postOffice,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ throws Exception;
boolean moveMessage(long messageID,
Binding toBinding, StorageManager storageManager,
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -51,6 +51,7 @@
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
@@ -578,6 +579,28 @@
return false;
}
+ public boolean changeMessagePriority(long messageID, byte newPriority,
+ StorageManager storageManager, PostOffice postOffice,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ throws Exception
+ {
+ List<MessageReference> refs = list(null);
+ for (MessageReference ref : refs)
+ {
+ ServerMessage message = ref.getMessage();
+ if (message.getMessageID() == messageID)
+ {
+ message.setPriority(newPriority);
+ // delete and add the reference so that it
+ // goes to the right queues for the new priority
+ deleteReference(messageID, storageManager);
+ addLast(ref);
+ return true;
+ }
+ }
+ return false;
+ }
+
public void lock()
{
lock.lock();
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -18,361 +18,435 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.jms.server;
import java.io.Serializable;
import java.util.List;
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.JBossQueue;
-
/**
* The JMS Management interface.
- *
+ *
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
*/
public interface JMSServerManager extends Serializable
{
-
+
String getVersion();
/**
* Has the Server been started.
+ *
* @return true if the server us running
*/
boolean isStarted();
/**
* Creates a JMS Queue.
- * @param queueName The name of the queue to create
- * @param jndiBinding the name of the binding for JNDI
- * @return true if the queue is created or if it existed and was added to JNDI
- * @throws Exception if problems were encountered creating the queue.
+ *
+ * @param queueName
+ * The name of the queue to create
+ * @param jndiBinding
+ * the name of the binding for JNDI
+ * @return true if the queue is created or if it existed and was added to
+ * JNDI
+ * @throws Exception
+ * if problems were encountered creating the queue.
*/
boolean createQueue(String queueName, String jndiBinding) throws Exception;
/**
* Creates a JMS Topic
- * @param topicName the name of the topic
- * @param jndiBinding the name of the binding for JNDI
- * @return true if the topic was created or if it existed and was added to JNDI
- * @throws Exception if a problem occurred creating the topic
+ *
+ * @param topicName
+ * the name of the topic
+ * @param jndiBinding
+ * the name of the binding for JNDI
+ * @return true if the topic was created or if it existed and was added to
+ * JNDI
+ * @throws Exception
+ * if a problem occurred creating the topic
*/
boolean createTopic(String topicName, String jndiBinding) throws Exception;
/**
* destroys a queue and removes it from JNDI
- * @param name the name of the queue to destroy
+ *
+ * @param name
+ * the name of the queue to destroy
* @return true if destroyed
- * @throws Exception if a problem occurred destroying the queue
+ * @throws Exception
+ * if a problem occurred destroying the queue
*/
boolean destroyQueue(String name) throws Exception;
/**
* destroys a topic and removes it from JNDI
- * @param name the name of the topic to destroy
+ *
+ * @param name
+ * the name of the topic to destroy
* @return true if the topic was destroyed
- * @throws Exception if a problem occurred destroying the topic
+ * @throws Exception
+ * if a problem occurred destroying the topic
*/
boolean destroyTopic(String name) throws Exception;
-// /**
-// * returns a list of all the JMS queues
-// * @return all queues
-// */
-// Set<String> listAllQueues();
-//
-// /**
-// * returns a list of all the JMS topics
-// * @return all topics
-// */
-// Set<String> listAllTopics();
-//
-// /**
-// * returns all the temporary destinations
-// * @return all temporary destinations
-// */
-// Set<String> listTemporaryDestinations();
+ // /**
+ // * returns a list of all the JMS queues
+ // * @return all queues
+ // */
+ // Set<String> listAllQueues();
+ //
+ // /**
+ // * returns a list of all the JMS topics
+ // * @return all topics
+ // */
+ // Set<String> listAllTopics();
+ //
+ // /**
+ // * returns all the temporary destinations
+ // * @return all temporary destinations
+ // */
+ // Set<String> listTemporaryDestinations();
/**
* Creates a connection factory
- * @param name the name of this connection factory
- * @param clientID the client id
- * @param dupsOKBatchSize the bath size
- * @param consumerWindowSize The consumer window size
- * @param consumerMaxRate the Consumer max rate
- * @param producerWindowSize the producer window size
- * @param producerMaxRate the producer max rate
- * @param jndiBinding the binding name for JNDI
+ *
+ * @param name
+ * the name of this connection factory
+ * @param clientID
+ * the client id
+ * @param dupsOKBatchSize
+ * the bath size
+ * @param consumerWindowSize
+ * The consumer window size
+ * @param consumerMaxRate
+ * the Consumer max rate
+ * @param producerWindowSize
+ * the producer window size
+ * @param producerMaxRate
+ * the producer max rate
+ * @param jndiBinding
+ * the binding name for JNDI
* @return true if the connection factory was created
- * @throws Exception if a problem occurred creating the connection factory
+ * @throws Exception
+ * if a problem occurred creating the connection factory
*/
boolean createConnectionFactory(String name, String clientID,
- int dupsOKBatchSize,
- int consumerWindowSize, int consumerMaxRate,
- int producerWindowSize, int producerMaxRate,
- boolean blockOnAcknowledge,
- boolean defaultSendNonPersistentMessagesBlocking,
- boolean defaultSendPersistentMessagesBlocking,
- String jndiBinding) throws Exception;
+ int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
+ int producerWindowSize, int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean defaultSendNonPersistentMessagesBlocking,
+ boolean defaultSendPersistentMessagesBlocking, String jndiBinding)
+ throws Exception;
/**
* Creates a connection factory
- * @param name the name of this connection factory
- * @param clientID the client id
- * @param dupsOKBatchSize the bath size
- * @param consumerWindowSize The consumer window size
- * @param consumerMaxRate the Consumer max rate
- * @param producerWindowSize the producer window size
- * @param producerMaxRate the producer max rate
- * @param jndiBindings the binding names for JNDI
+ *
+ * @param name
+ * the name of this connection factory
+ * @param clientID
+ * the client id
+ * @param dupsOKBatchSize
+ * the bath size
+ * @param consumerWindowSize
+ * The consumer window size
+ * @param consumerMaxRate
+ * the Consumer max rate
+ * @param producerWindowSize
+ * the producer window size
+ * @param producerMaxRate
+ * the producer max rate
+ * @param jndiBindings
+ * the binding names for JNDI
* @return true if the connection factory was created
- * @throws Exception if a problem occurred creating the connection factory
+ * @throws Exception
+ * if a problem occurred creating the connection factory
*/
- boolean createConnectionFactory(String name, String clientID, int dupsOKBatchSize,
- int consumerWindowSize, int consumerMaxRate,
- int producerWindowSize, int producerMaxRate,
- boolean blockOnAcknowledge,
- boolean defaultSendNonPersistentMessagesBlocking,
- boolean defaultSendPersistentMessagesBlocking,
- List<String> jndiBindings) throws Exception;
+ boolean createConnectionFactory(String name, String clientID,
+ int dupsOKBatchSize, int consumerWindowSize, int consumerMaxRate,
+ int producerWindowSize, int producerMaxRate,
+ boolean blockOnAcknowledge,
+ boolean defaultSendNonPersistentMessagesBlocking,
+ boolean defaultSendPersistentMessagesBlocking,
+ List<String> jndiBindings) throws Exception;
/**
* destroys a connection factory.
- * @param name the name of the connection factory to destroy
+ *
+ * @param name
+ * the name of the connection factory to destroy
* @return true if the connection factory was destroyed
- * @throws Exception if a problem occurred destroying the connection factory
+ * @throws Exception
+ * if a problem occurred destroying the connection factory
*/
boolean destroyConnectionFactory(String name) throws Exception;
-// /**
-// * list all messages for a specific queue
-// * @param queue the queue to inspect
-// * @return all messages
-// * @throws Exception if a problem occurred
-// */
-// public List<Message> listMessagesForQueue(String queue) throws Exception;
-//
-// /**
-// * list the messages on a specific queue dependant on the ListType.
-// * ListType.ALL returns all messages
-// * ListType.DURABLE returns all durable messages
-// * ListType.NON_DURABLE returns all non durable messages
-// * @param queue the queue to inspect
-// * @param listType the list type.
-// * @return the messages
-// * @throws Exception if a problem occurred
-// */
-// public List<Message> listMessagesForQueue(String queue, ListType listType) throws Exception;
-//
-// /**
-// * list all messages for a specific subscription
-// * @param subscription the subscription to inspect
-// * @return all messages
-// * @throws Exception if a problem occurred
-// */
-// public List<Message> listMessagesForSubscription(String subscription) throws Exception;
-//
-// /**
-// * list the messages on a specific subscription dependant on the ListType.
-// * ListType.ALL returns all messages
-// * ListType.DURABLE returns all durable messages
-// * ListType.NON_DURABLE returns all non durable messages
-// * @param subscription the subscription to inspect
-// * @param listType the list type
-// * @return the messages
-// * @throws Exception if a problem occurred
-// */
-// public List<Message> listMessagesForSubscription(String subscription, ListType listType) throws Exception;
-//
-// /**
-// * removes a particular message from a queue
-// * @param queue the name of the queue
-// * @param messageId the id of the message to remove
-// * @throws Exception if a problem occurred
-// */
-// // void removeMessageFromQueue(String queue, String messageId) throws Exception;
-//
-// /**
-// * removes a particular message from a topic
-// * @param topic the name of the topic
-// * @param messageId the id of the message
-// * @throws Exception if a problem occurred
-// */
-// // void removeMessageFromTopic(String topic, String messageId) throws Exception;
+ // /**
+ // * list all messages for a specific queue
+ // * @param queue the queue to inspect
+ // * @return all messages
+ // * @throws Exception if a problem occurred
+ // */
+ // public List<Message> listMessagesForQueue(String queue) throws Exception;
+ //
+ // /**
+ // * list the messages on a specific queue dependant on the ListType.
+ // * ListType.ALL returns all messages
+ // * ListType.DURABLE returns all durable messages
+ // * ListType.NON_DURABLE returns all non durable messages
+ // * @param queue the queue to inspect
+ // * @param listType the list type.
+ // * @return the messages
+ // * @throws Exception if a problem occurred
+ // */
+ // public List<Message> listMessagesForQueue(String queue, ListType listType)
+ // throws Exception;
+ //
+ // /**
+ // * list all messages for a specific subscription
+ // * @param subscription the subscription to inspect
+ // * @return all messages
+ // * @throws Exception if a problem occurred
+ // */
+ // public List<Message> listMessagesForSubscription(String subscription)
+ // throws Exception;
+ //
+ // /**
+ // * list the messages on a specific subscription dependant on the ListType.
+ // * ListType.ALL returns all messages
+ // * ListType.DURABLE returns all durable messages
+ // * ListType.NON_DURABLE returns all non durable messages
+ // * @param subscription the subscription to inspect
+ // * @param listType the list type
+ // * @return the messages
+ // * @throws Exception if a problem occurred
+ // */
+ // public List<Message> listMessagesForSubscription(String subscription,
+ // ListType listType) throws Exception;
+ //
+ // /**
+ // * removes a particular message from a queue
+ // * @param queue the name of the queue
+ // * @param messageId the id of the message to remove
+ // * @throws Exception if a problem occurred
+ // */
+ // // void removeMessageFromQueue(String queue, String messageId) throws
+ // Exception;
+ //
+ // /**
+ // * removes a particular message from a topic
+ // * @param topic the name of the topic
+ // * @param messageId the id of the message
+ // * @throws Exception if a problem occurred
+ // */
+ // // void removeMessageFromTopic(String topic, String messageId) throws
+ // Exception;
/**
* removes all messages from a particular destination
- * @param queue the destination
- * @throws Exception if a problem occurred
+ *
+ * @param queue
+ * the destination
+ * @throws Exception
+ * if a problem occurred
*/
void removeAllMessages(JBossDestination destination) throws Exception;
-//
-// /**
-// * moves a message from one queue to another
-// * @param fromQueue the name of the queue to find the message
-// * @param toQueue the name of the queue to move the message to
-// * @param messageID the id of the message
-// * @throws Exception if a problem occurred
-// */
-// //void moveMessage(String fromQueue, String toQueue, String messageID) throws Exception;
-//
-// /**
-// * expires a message
-// * @param queue the name of the queue
-// * @param messageId the message id
-// * @throws Exception if a problem occurred
-// */
-// void expireMessage(String queue, String messageId) throws Exception;
-//
-// /**
-// * changes the priority of a message.
-// * @param queue the name of the queue
-// * @param messageId the id of the message
-// * @param priority the priority to change the message to
-// * @throws Exception if a problem occurred
-// */
-// // void changeMessagePriority(String queue, String messageId, int priority) throws Exception;
-//
-//
-// /**
-// * lists all the subscriptions for a specific topic for a specific ListType.
-// * ListType.ALL returns all subscriptions
-// * ListType.DURABLE returns all durable subscriptions
-// * ListType.NON_DURABLE returns all non durable subscriptions
-// *
-// * @param topicName the name of the topic
-// * @param listType the list type
-// * @return the subscriptions
-// * @throws Exception if a problem occurred
-// */
-// List<SubscriptionInfo> listSubscriptions(String topicName, ListType listType) throws Exception;
-//
-// /**
-// * count the subscriptions a topic currently has
-// * @param topic the name of the topic
-// * @return the number of subscriptions
-// * @throws Exception if a problem occurred
-// */
-// int getSubscriptionsCountForTopic(String topic) throws Exception;
-//
-// /**
-// * count the subscriptions a topic currently has of a specific type.
-// * ListType.ALL returns all subscriptions
-// * ListType.DURABLE returns all durable subscriptions
-// * ListType.NON_DURABLE returns all non durable subscriptions
-// *
-// * @param topic the name of the topic
-// * @param listType the list type
-// * @return the number of subscriptions
-// * @throws Exception if a problem occurred
-// */
-// int getSubscriptionsCountForTopic(String topic, ListType listType) throws Exception;
-//
-// /**
-// * drops a particular subscription
-// *
-// * @param subscription the id of the subscription
-// * @throws Exception if a problem occurred
-// */
-// void dropSubscription(String subscription) throws Exception;
-//
-// /**
-// * count the consumers for a specific queue
-// * @param queue the name of the queue
-// * @return the number of consumers
-// * @throws Exception if a problem occurred
-// */
-// int getConsumerCountForQueue(String queue) throws Exception;
-//
-// /**
-// * returns info on all the current active connections
-// * @return the connections info
-// * @throws Exception if a problem occurred
-// */
-// List<ConnectionInfo> getConnections() throws Exception;
-//
-// /**
-// * return the connections info for a particular user.
-// * @param user the user
-// * @return the connections info
-// * @throws Exception if a problem occurred
-// */
-// List<ConnectionInfo> getConnectionsForUser(String user) throws Exception;
-//
-// /**
-// * drops the connection with the specified client id
-// * @param clientId the client id
-// * @throws Exception if a problem occurred
-// */
-// void dropConnection(long id) throws Exception;
-//
-// /**
-// * drop all the connections for a specific user
-// * @param user the user
-// * @throws Exception if a problem occurred
-// */
-// void dropConnectionsForUser(String user) throws Exception;
-//
-// /**
-// * list all the sessions info
-// * @return the session info
-// * @throws Exception if a problem occurred
-// */
-// //public List<SessionInfo> getSessions() throws Exception;
-//
-// /**
-// * get the session info for a particular connection with the specified client id
-// * @param clientid the client id
-// * @return the session info
-// * @throws Exception if a problem occurred
-// */
-// // public List<SessionInfo> getSessionsForConnection(long id) throws Exception;
-//
-// /**
-// * get the session info for a particular user
-// * @param user the user
-// * @return the session info
-// * @throws Exception if a problem occurred
-// */
-// // public List<SessionInfo> getSessionsForUser(String user) throws Exception;
-//
-// /**
-// * Start gathering delivery statistics for all queues
-// * @throws Exception if a problem occurred
-// */
-// void startGatheringStatistics() throws Exception;
-//
-// /**
-// * Start gathering delivery statistics for a specified queue
-// * @param queue the name of the queue
-// * @throws Exception if a problem occurred
-// */
-// void startGatheringStatisticsForQueue(String queue) throws Exception;
-//
-// /**
-// * stop gathering delivery statistics for all queues
-// * @return the delivery statistics at the time of stopping gathering
-// * @throws Exception if a problem occurred
-// */
-// List<MessageStatistics> stopGatheringStatistics() throws Exception;
-//
-// /**
-// * stop gathering statistics for a specified queue
-// * @param queue the name of the queue
-// * @return the delivery statistics for that queue at the time of stopping gathering
-// * @throws Exception if a problem occurred
-// */
-// MessageStatistics stopGatheringStatisticsForQueue(String queue) throws Exception;
-//
-// /**
-// * list all message delivery statistics. This will include statistics up to the point this method is called.
-// * The gathering of statistics will carry on.
-// * @return the delivery statistics
-// * @throws Exception if a problem occurred
-// */
-// List<MessageStatistics> getStatistics() throws Exception;
+ boolean removeMessage(long messageID, JBossDestination destination)
+ throws Exception;
+
+ int expireMessages(Filter filter, JBossDestination destination)
+ throws Exception;
+
+ public QueueSettings getSettings(JBossDestination destination);
+
+ int sendMessagesToDLQ(Filter filter, JBossDestination destination)
+ throws Exception;
+
+ int changeMessagesPriority(Filter filter, byte newPriority,
+ JBossDestination destination) throws Exception;
+
+ //
+ // /**
+ // * moves a message from one queue to another
+ // * @param fromQueue the name of the queue to find the message
+ // * @param toQueue the name of the queue to move the message to
+ // * @param messageID the id of the message
+ // * @throws Exception if a problem occurred
+ // */
+ // //void moveMessage(String fromQueue, String toQueue, String messageID)
+ // throws Exception;
+ //
+ // /**
+ // * expires a message
+ // * @param queue the name of the queue
+ // * @param messageId the message id
+ // * @throws Exception if a problem occurred
+ // */
+ // void expireMessage(String queue, String messageId) throws Exception;
+ //
+ // /**
+ // * changes the priority of a message.
+ // * @param queue the name of the queue
+ // * @param messageId the id of the message
+ // * @param priority the priority to change the message to
+ // * @throws Exception if a problem occurred
+ // */
+ // // void changeMessagePriority(String queue, String messageId, int
+ // priority) throws Exception;
+ //
+ //
+ // /**
+ // * lists all the subscriptions for a specific topic for a specific
+ // ListType.
+ // * ListType.ALL returns all subscriptions
+ // * ListType.DURABLE returns all durable subscriptions
+ // * ListType.NON_DURABLE returns all non durable subscriptions
+ // *
+ // * @param topicName the name of the topic
+ // * @param listType the list type
+ // * @return the subscriptions
+ // * @throws Exception if a problem occurred
+ // */
+ // List<SubscriptionInfo> listSubscriptions(String topicName, ListType
+ // listType) throws Exception;
+ //
+ // /**
+ // * count the subscriptions a topic currently has
+ // * @param topic the name of the topic
+ // * @return the number of subscriptions
+ // * @throws Exception if a problem occurred
+ // */
+ // int getSubscriptionsCountForTopic(String topic) throws Exception;
+ //
+ // /**
+ // * count the subscriptions a topic currently has of a specific type.
+ // * ListType.ALL returns all subscriptions
+ // * ListType.DURABLE returns all durable subscriptions
+ // * ListType.NON_DURABLE returns all non durable subscriptions
+ // *
+ // * @param topic the name of the topic
+ // * @param listType the list type
+ // * @return the number of subscriptions
+ // * @throws Exception if a problem occurred
+ // */
+ // int getSubscriptionsCountForTopic(String topic, ListType listType) throws
+ // Exception;
+ //
+ // /**
+ // * drops a particular subscription
+ // *
+ // * @param subscription the id of the subscription
+ // * @throws Exception if a problem occurred
+ // */
+ // void dropSubscription(String subscription) throws Exception;
+ //
+ // /**
+ // * count the consumers for a specific queue
+ // * @param queue the name of the queue
+ // * @return the number of consumers
+ // * @throws Exception if a problem occurred
+ // */
+ // int getConsumerCountForQueue(String queue) throws Exception;
+ //
+ // /**
+ // * returns info on all the current active connections
+ // * @return the connections info
+ // * @throws Exception if a problem occurred
+ // */
+ // List<ConnectionInfo> getConnections() throws Exception;
+ //
+ // /**
+ // * return the connections info for a particular user.
+ // * @param user the user
+ // * @return the connections info
+ // * @throws Exception if a problem occurred
+ // */
+ // List<ConnectionInfo> getConnectionsForUser(String user) throws Exception;
+ //
+ // /**
+ // * drops the connection with the specified client id
+ // * @param clientId the client id
+ // * @throws Exception if a problem occurred
+ // */
+ // void dropConnection(long id) throws Exception;
+ //
+ // /**
+ // * drop all the connections for a specific user
+ // * @param user the user
+ // * @throws Exception if a problem occurred
+ // */
+ // void dropConnectionsForUser(String user) throws Exception;
+ //
+ // /**
+ // * list all the sessions info
+ // * @return the session info
+ // * @throws Exception if a problem occurred
+ // */
+ // //public List<SessionInfo> getSessions() throws Exception;
+ //
+ // /**
+ // * get the session info for a particular connection with the specified
+ // client id
+ // * @param clientid the client id
+ // * @return the session info
+ // * @throws Exception if a problem occurred
+ // */
+ // // public List<SessionInfo> getSessionsForConnection(long id) throws
+ // Exception;
+ //
+ // /**
+ // * get the session info for a particular user
+ // * @param user the user
+ // * @return the session info
+ // * @throws Exception if a problem occurred
+ // */
+ // // public List<SessionInfo> getSessionsForUser(String user) throws
+ // Exception;
+ //
+ // /**
+ // * Start gathering delivery statistics for all queues
+ // * @throws Exception if a problem occurred
+ // */
+ // void startGatheringStatistics() throws Exception;
+ //
+ // /**
+ // * Start gathering delivery statistics for a specified queue
+ // * @param queue the name of the queue
+ // * @throws Exception if a problem occurred
+ // */
+ // void startGatheringStatisticsForQueue(String queue) throws Exception;
+ //
+ // /**
+ // * stop gathering delivery statistics for all queues
+ // * @return the delivery statistics at the time of stopping gathering
+ // * @throws Exception if a problem occurred
+ // */
+ // List<MessageStatistics> stopGatheringStatistics() throws Exception;
+ //
+ // /**
+ // * stop gathering statistics for a specified queue
+ // * @param queue the name of the queue
+ // * @return the delivery statistics for that queue at the time of stopping
+ // gathering
+ // * @throws Exception if a problem occurred
+ // */
+ // MessageStatistics stopGatheringStatisticsForQueue(String queue) throws
+ // Exception;
+ //
+ // /**
+ // * list all message delivery statistics. This will include statistics up to
+ // the point this method is called.
+ // * The gathering of statistics will carry on.
+ // * @return the delivery statistics
+ // * @throws Exception if a problem occurred
+ // */
+ // List<MessageStatistics> getStatistics() throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -32,8 +32,11 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.MessagingServerManagement;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.JBossTopic;
@@ -201,6 +204,11 @@
return true;
}
+ public QueueSettings getSettings(JBossDestination destination)
+ {
+ return messagingServerManagement.getQueueSettings(destination.getSimpleAddress());
+ }
+
// public Set<String> listAllQueues()
// {
// Set<String> availableAddresses = messagingServerManagement.listAvailableAddresses();
@@ -368,6 +376,32 @@
messagingServerManagement.removeAllMessagesForAddress(destination.getSimpleAddress());
}
+ public boolean removeMessage(long messageID, JBossDestination destination) throws Exception
+ {
+ return messagingServerManagement.removeMessageFromAddress(messageID, destination.getSimpleAddress());
+ }
+
+ public int expireMessages(Filter filter, JBossDestination destination) throws Exception
+ {
+ List<MessageReference> refs = messagingServerManagement.expireMessages(filter, destination.getSimpleAddress());
+
+ return refs.size();
+ }
+
+ public int sendMessagesToDLQ(Filter filter, JBossDestination destination) throws Exception
+ {
+ List<MessageReference> refs = messagingServerManagement.sendMessagesToDLQ(filter, destination.getSimpleAddress());
+
+ return refs.size();
+ }
+
+ public int changeMessagesPriority(Filter filter, byte newPriority,
+ JBossDestination destination) throws Exception
+ {
+ List<MessageReference> refs = messagingServerManagement.changeMessagesPriority(filter, newPriority, destination.getSimpleAddress());
+
+ return refs.size();
+ }
//
//// public void moveMessage(String fromQueue, String toQueue, String messageId) throws Exception
//// {
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -22,6 +22,7 @@
package org.jboss.messaging.jms.server.management;
+import static javax.management.MBeanOperationInfo.ACTION;
import static javax.management.MBeanOperationInfo.INFO;
import javax.management.openmbean.TabularData;
@@ -37,6 +38,14 @@
*/
public interface JMSQueueControlMBean extends DestinationControlMBean
{
+ // Attributes ----------------------------------------------------
+
+ String getExpiryQueue();
+
+ String getDLQ();
+
+ // Operations ----------------------------------------------------
+
@Operation(desc = "List all messages in the queue", impact = INFO)
TabularData listAllMessages() throws Exception;
@@ -44,4 +53,34 @@
TabularData listMessages(
@Parameter(name = "filter", desc = "A JMS Message filter") String filter)
throws Exception;
+
+ @Operation(desc = "Remove all the messages from the queue", impact = ACTION)
+ void removeAllMessages() throws Exception;
+
+ @Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
+ boolean removeMessage(
+ @Parameter(name = "messageID", desc = "A message ID") String messageID)
+ throws Exception;
+
+ @Operation(desc = "Expire the messages corresponding to the given filter (and returns the number of expired messages)", impact = ACTION)
+ int expireMessages(
+ @Parameter(name = "filter", desc = "A message filter") String filter)
+ throws Exception;
+
+ @Operation(desc = "Expire the message corresponding to the given messageID", impact = ACTION)
+ boolean expireMessage(
+ @Parameter(name = "messageID", desc = "A message ID") String messageID)
+ throws Exception;
+
+ @Operation(desc = "Send the message corresponding to the given messageID to the queue's Dead Letter Queue", impact = ACTION)
+ boolean sendMessageTDLQ(
+ @Parameter(name = "messageID", desc = "A message ID") String messageID)
+ throws Exception;
+
+ @Operation(desc = "Change the priority of the message corresponding to the given messageID", impact = ACTION)
+ boolean changeMessagePriority(
+ @Parameter(name = "messageID", desc = "A message ID") String messageID,
+ @Parameter(name = "newPriority", desc = "the new priority (between 0 and 9)") int newPriority)
+ throws Exception;
+
}
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -37,7 +37,10 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.client.JBossMessage;
import org.jboss.messaging.jms.client.SelectorTranslator;
import org.jboss.messaging.jms.server.JMSServerManager;
import org.jboss.messaging.jms.server.management.JMSMessageInfo;
@@ -64,6 +67,13 @@
// Static --------------------------------------------------------
+ private static Filter createFilterForJMSMessageID(String jmsMessageID)
+ throws Exception
+ {
+ return new FilterImpl(new SimpleString(JBossMessage.JBM_MESSAGE_ID
+ + " = '" + jmsMessageID + "'"));
+ }
+
// Constructors --------------------------------------------------
public JMSQueueControl(JBossQueue queue, Queue coreQueue,
@@ -106,6 +116,45 @@
return binding;
}
+ public String getDLQ()
+ {
+ QueueSettings settings = server.getSettings(managedQueue);
+ if (settings != null && settings.getDLQ() != null)
+ {
+ return JBossDestination.fromAddress(settings.getDLQ().toString())
+ .getName();
+ } else
+ {
+ return null;
+ }
+ }
+
+ public String getExpiryQueue()
+ {
+ QueueSettings settings = server.getSettings(managedQueue);
+ if (settings != null && settings.getExpiryQueue() != null)
+ {
+ return JBossDestination.fromAddress(
+ settings.getExpiryQueue().toString()).getName();
+ } else
+ {
+ return null;
+ }
+ }
+
+ public boolean removeMessage(String messageID) throws Exception
+ {
+ Filter filter = createFilterForJMSMessageID(messageID);
+ List<MessageReference> refs = coreQueue.list(filter);
+ if (refs.size() != 1)
+ {
+ throw new IllegalArgumentException(
+ "No message found for JMSMessageID: " + messageID);
+ }
+ return server.removeMessage(refs.get(0).getMessage().getMessageID(),
+ managedQueue);
+ }
+
public void removeAllMessages() throws Exception
{
server.removeAllMessages(managedQueue);
@@ -140,6 +189,62 @@
}
}
+ public boolean expireMessage(String messageID) throws Exception
+ {
+ Filter filter = createFilterForJMSMessageID(messageID);
+ List<MessageReference> refs = coreQueue.list(filter);
+ if (refs.size() != 1)
+ {
+ throw new IllegalArgumentException(
+ "No message found for JMSMessageID: " + messageID);
+ }
+ return server.expireMessages(filter, managedQueue) == 1;
+ }
+
+ public int expireMessages(String filterStr) throws Exception
+ {
+ try
+ {
+ Filter filter = filterStr == null ? null : new FilterImpl(
+ new SimpleString(SelectorTranslator
+ .convertToJBMFilterString(filterStr)));
+ return server.expireMessages(filter, managedQueue);
+ } catch (MessagingException e)
+ {
+ throw new IllegalStateException(e.getMessage());
+ }
+ }
+
+ public boolean sendMessageTDLQ(String messageID) throws Exception
+ {
+ Filter filter = createFilterForJMSMessageID(messageID);
+ List<MessageReference> refs = coreQueue.list(filter);
+ if (refs.size() != 1)
+ {
+ throw new IllegalArgumentException(
+ "No message found for JMSMessageID: " + messageID);
+ }
+ return server.sendMessagesToDLQ(filter, managedQueue) == 1;
+ }
+
+ public boolean changeMessagePriority(String messageID, int newPriority)
+ throws Exception
+ {
+ if (newPriority < 0 || newPriority > 9)
+ {
+ throw new IllegalArgumentException("invalid newPriority value: "
+ + newPriority + ". It must be between 0 and 9 (both included)");
+ }
+ Filter filter = createFilterForJMSMessageID(messageID);
+ List<MessageReference> refs = coreQueue.list(filter);
+ if (refs.size() != 1)
+ {
+ throw new IllegalArgumentException(
+ "No message found for JMSMessageID: " + messageID);
+ }
+ return server.changeMessagesPriority(filter, (byte) newPriority, managedQueue) == 1;
+ }
+
// StandardMBean overrides ---------------------------------------
@Override
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -689,29 +689,23 @@
public void testChangeMessagePriority() throws Exception
{
long messageID = randomLong();
- int newPriority = 5;
+ byte newPriority = 5;
List<MessageReference> refs = new ArrayList<MessageReference>();
MessageReference ref = createMock(MessageReference.class);
- ServerMessage message = createMock(ServerMessage.class);
- expect(message.getMessageID()).andStubReturn(messageID);
- expect(ref.getMessage()).andReturn(message);
- message.setPriority((byte) newPriority);
refs.add(ref);
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
Queue queue = createMock(Queue.class);
- expect(queue.list(null)).andReturn(refs);
- expect(queue.deleteReference(messageID, storageManager)).andReturn(true);
- expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
+ expect(queue.changeMessagePriority(messageID, newPriority, storageManager, postOffice, repository)).andReturn(true);
- replay(queue, storageManager, postOffice, repository, ref, message);
+ replay(queue, storageManager, postOffice, repository, ref);
QueueControl control = new QueueControl(queue, storageManager,
postOffice, repository);
assertTrue(control.changeMessagePriority(messageID, newPriority));
- verify(queue, storageManager, postOffice, repository, ref, message);
+ verify(queue, storageManager, postOffice, repository, ref);
}
public void testChangeMessagePriorityWithInvalidPriorityValues()
@@ -750,13 +744,13 @@
public void testChangeMessagePriorityWithNoMessageID() throws Exception
{
long messageID = randomLong();
- int newPriority = 5;
+ byte newPriority = 5;
Queue queue = createMock(Queue.class);
- expect(queue.list(null)).andReturn(new ArrayList<MessageReference>());
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
-
+ expect(queue.changeMessagePriority(messageID, newPriority, storageManager, postOffice, repository)).andReturn(false);
+
replay(queue, storageManager, postOffice, repository);
QueueControl control = new QueueControl(queue, storageManager,
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java 2008-08-01 08:41:23 UTC (rev 4762)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java 2008-08-01 13:28:18 UTC (rev 4763)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.unit.jms.server.management.impl;
import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.isA;
import static org.easymock.EasyMock.replay;
@@ -35,6 +36,7 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -43,10 +45,12 @@
import junit.framework.TestCase;
+import org.easymock.EasyMock;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.server.JMSServerManager;
import org.jboss.messaging.jms.server.management.impl.JMSQueueControl;
@@ -162,6 +166,91 @@
verify(coreQueue, serverManager);
}
+ public void testGetDLQ() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ final String dlq = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ QueueSettings settings = new QueueSettings()
+ {
+ @Override
+ public SimpleString getDLQ()
+ {
+ return new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + dlq);
+ }
+ };
+ expect(serverManager.getSettings(queue)).andReturn(settings);
+
+ replay(coreQueue, serverManager);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ assertEquals(dlq, control.getDLQ());
+
+ verify(coreQueue, serverManager);
+ }
+
+ public void testGetExpiryQueue() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ final String expiryQueue = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ QueueSettings settings = new QueueSettings()
+ {
+ @Override
+ public SimpleString getExpiryQueue()
+ {
+ return new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX
+ + expiryQueue);
+ }
+ };
+ expect(serverManager.getSettings(queue)).andReturn(settings);
+
+ replay(coreQueue, serverManager);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ assertEquals(expiryQueue, control.getExpiryQueue());
+
+ verify(coreQueue, serverManager);
+ }
+
+ public void testRemoveMessage() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ String jmsMessageID = randomString();
+ long messageID = randomLong();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ MessageReference ref = createMock(MessageReference.class);
+ ServerMessage message = createMock(ServerMessage.class);
+ expect(message.getMessageID()).andReturn(messageID);
+ expect(ref.getMessage()).andReturn(message);
+ refs.add(ref);
+ expect(coreQueue.list(EasyMock.isA(Filter.class))).andReturn(refs);
+ expect(serverManager.removeMessage(messageID, queue)).andReturn(true);
+
+ replay(coreQueue, serverManager, ref, message);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ assertTrue(control.removeMessage(jmsMessageID));
+
+ verify(coreQueue, serverManager, ref, message);
+ }
+
public void testRemoveAllMessages() throws Exception
{
String jndiBinding = randomString();
@@ -248,6 +337,221 @@
}
verify(coreQueue, serverManager);
}
+
+ public void testExpireMessage() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ String jmsMessageID = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ MessageReference ref = createMock(MessageReference.class);
+ refs.add(ref);
+ expect(coreQueue.list(EasyMock.isA(Filter.class))).andReturn(refs);
+ expect(serverManager.expireMessages(isA(Filter.class), eq(queue)))
+ .andReturn(1);
+
+ replay(coreQueue, serverManager, ref);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ assertTrue(control.expireMessage(jmsMessageID));
+
+ verify(coreQueue, serverManager, ref);
+ }
+
+ public void testExpireMessageWithNoJMSMesageID() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ String jmsMessageID = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ expect(coreQueue.list(isA(Filter.class))).andReturn(
+ new ArrayList<MessageReference>());
+
+ replay(coreQueue, serverManager);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ try
+ {
+ control.expireMessage(jmsMessageID);
+ fail("IllegalArgumentException");
+ } catch (IllegalArgumentException e)
+ {
+ }
+
+ verify(coreQueue, serverManager);
+ }
+
+ public void testExpireMessages() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ int expiredMessage = randomInt();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ expect(serverManager.expireMessages(isA(Filter.class), eq(queue)))
+ .andReturn(expiredMessage);
+
+ replay(coreQueue, serverManager);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ assertEquals(expiredMessage, control.expireMessages("color = 'green'"));
+
+ verify(coreQueue, serverManager);
+ }
+
+ public void testSendMessageToDLQ() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ String jmsMessageID = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ MessageReference ref = createMock(MessageReference.class);
+ refs.add(ref);
+ expect(coreQueue.list(isA(Filter.class))).andReturn(refs);
+ expect(serverManager.sendMessagesToDLQ(isA(Filter.class), eq(queue)))
+ .andReturn(1);
+
+ replay(coreQueue, serverManager, ref);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ assertTrue(control.sendMessageTDLQ(jmsMessageID));
+
+ verify(coreQueue, serverManager, ref);
+ }
+
+ public void testSendMessageToDLQWithNoJMSMesageID() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ String jmsMessageID = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ expect(coreQueue.list(isA(Filter.class))).andReturn(
+ new ArrayList<MessageReference>());
+
+ replay(coreQueue, serverManager);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ try
+ {
+ control.sendMessageTDLQ(jmsMessageID);
+ fail("IllegalArgumentException");
+ } catch (IllegalArgumentException e)
+ {
+ }
+
+ verify(coreQueue, serverManager);
+ }
+
+ public void testChangeMessagePriority() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ byte newPriority = 5;
+ String jmsMessageID = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ MessageReference ref = createMock(MessageReference.class);
+ refs.add(ref);
+ expect(coreQueue.list(isA(Filter.class))).andReturn(refs);
+ expect(
+ serverManager.changeMessagesPriority(isA(Filter.class),
+ eq(newPriority), eq(queue))).andReturn(1);
+
+ replay(coreQueue, serverManager, ref);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ assertTrue(control.changeMessagePriority(jmsMessageID, newPriority));
+
+ verify(coreQueue, serverManager, ref);
+ }
+
+ public void testChangeMessagePriorityWithInvalidPriorityValues()
+ throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ String jmsMessageID = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+
+ replay(coreQueue, serverManager);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ try
+ {
+ control.changeMessagePriority(jmsMessageID, -1);
+ fail("IllegalArgumentException");
+ } catch (IllegalArgumentException e)
+ {
+ }
+
+ try
+ {
+ control.changeMessagePriority(jmsMessageID, 10);
+ fail("IllegalArgumentException");
+ } catch (IllegalArgumentException e)
+ {
+ }
+
+ verify(coreQueue, serverManager);
+ }
+
+ public void testChangeMessagePriorityWithNoJMSMesageID() throws Exception
+ {
+ String jndiBinding = randomString();
+ String name = randomString();
+ byte newPriority = 5;
+ String jmsMessageID = randomString();
+
+ JBossQueue queue = new JBossQueue(name);
+ Queue coreQueue = createMock(Queue.class);
+ JMSServerManager serverManager = createMock(JMSServerManager.class);
+ expect(coreQueue.list(isA(Filter.class))).andReturn(
+ new ArrayList<MessageReference>());
+
+ replay(coreQueue, serverManager);
+
+ JMSQueueControl control = new JMSQueueControl(queue, coreQueue,
+ jndiBinding, serverManager);
+ try
+ {
+ control.changeMessagePriority(jmsMessageID, newPriority);
+ fail("IllegalArgumentException");
+ } catch (IllegalArgumentException e)
+ {
+ }
+
+ verify(coreQueue, serverManager);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list