[jboss-cvs] JBoss Messaging SVN: r5439 - in trunk: src/main/org/jboss/messaging/core/management and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 27 10:59:16 EST 2008
Author: jmesnil
Date: 2008-11-27 10:59:16 -0500 (Thu, 27 Nov 2008)
New Revision: 5439
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java
Log:
refactored Queue and MessageReference so that methods dealing with many messages (e.g. by passing a filter) do the work within a single transaction
Modified: trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/filter/impl/FilterImpl.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -97,6 +97,18 @@
private final FilterParser parser = new FilterParser();
+ // Static ---------------------------------------------------------
+
+ /**
+ * @return null if <code>filterStr</code> is null or a valid filter else
+ * @throws MessagingException if the string does not correspond to a valid filter
+ */
+ public static Filter createFilter(final String filterStr) throws MessagingException
+ {
+ Filter filter = (filterStr == null) ? null : new FilterImpl(new SimpleString(filterStr));
+ return filter;
+ }
+
// Constructors ---------------------------------------------------
public FilterImpl(final SimpleString str) throws MessagingException
Modified: trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -82,7 +82,7 @@
throws Exception;
@Operation(desc = "Remove all the messages from the queue", impact = ACTION)
- void removeAllMessages() throws Exception;
+ int removeAllMessages() throws Exception;
@Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
boolean removeMessage(
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -202,8 +202,7 @@
{
try
{
- Filter filter = filterStr == null ? null : new FilterImpl(
- new SimpleString(filterStr));
+ Filter filter = FilterImpl.createFilter(filterStr);
List<MessageReference> refs = queue.list(filter);
MessageInfo[] infos = new MessageInfo[refs.size()];
for (int i = 0; i < refs.size(); i++)
@@ -230,11 +229,11 @@
}
}
- public void removeAllMessages() throws Exception
+ public int removeAllMessages() throws Exception
{
try
{
- queue.deleteAllReferences(storageManager);
+ return queue.deleteAllReferences(storageManager);
} catch (MessagingException e)
{
throw new IllegalStateException(e.getMessage());
@@ -254,13 +253,8 @@
public int removeMatchingMessages(String filterStr) throws Exception
{
- Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
- List<MessageReference> refs = queue.list(filter);
- for (MessageReference ref : refs)
- {
- removeMessage(ref.getMessage().getMessageID());
- }
- return refs.size();
+ Filter filter = FilterImpl.createFilter(filterStr);
+ return queue.deleteMatchingReferences(filter, storageManager);
}
public boolean expireMessage(final long messageID) throws Exception
@@ -273,18 +267,8 @@
{
try
{
- Filter filter = null;
- if (filterStr != null)
- {
- filter = new FilterImpl(new SimpleString(filterStr));
- }
- List<MessageReference> refs = queue.list(filter);
- for (MessageReference ref : refs)
- {
- queue.expireMessage(ref.getMessage().getMessageID(),
- storageManager, postOffice, queueSettingsRepository);
- }
- return refs.size();
+ Filter filter = FilterImpl.createFilter(filterStr);
+ return queue.expireMessages(filter, storageManager, postOffice, queueSettingsRepository);
} catch (MessagingException e)
{
throw new IllegalStateException(e.getMessage());
@@ -301,21 +285,20 @@
+ otherQueueName);
}
- return queue.moveMessage(messageID, binding, storageManager, postOffice);
+ return queue.moveMessage(messageID, binding.getAddress(), storageManager, postOffice);
}
public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
{
- Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
- List<MessageReference> refs = queue.list(filter);
- synchronized (queue)
+ Filter filter = FilterImpl.createFilter(filterStr);
+ Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+ if (binding == null)
{
- for (MessageReference ref : refs)
- {
- moveMessage(ref.getMessage().getMessageID(), otherQueueName);
- }
- return refs.size();
+ throw new IllegalArgumentException("No queue found for "
+ + otherQueueName);
}
+
+ return queue.moveMessages(filter, binding.getAddress(), storageManager, postOffice);
}
public int moveAllMessages(String otherQueueName) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -23,10 +23,11 @@
package org.jboss.messaging.core.server;
import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.util.SimpleString;
/**
* A reference to a message.
@@ -71,10 +72,16 @@
void expire(StorageManager storageManager, PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
-
- void move(Binding otherBinding, StorageManager persistenceManager, PostOffice postOffice) throws Exception;
+ void expire(Transaction tx,
+ StorageManager storageManager,
+ PostOffice postOffice,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+ void move(SimpleString toAddress, StorageManager persistenceManager, PostOffice postOffice) throws Exception;
+
+ void move(SimpleString toAddress, Transaction tx, StorageManager persistenceManager, boolean expiry) throws Exception;
+
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -28,7 +28,6 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -108,16 +107,27 @@
MessageReference getReference(long id);
- void deleteAllReferences(StorageManager storageManager) throws Exception;
+ int deleteAllReferences(StorageManager storageManager) throws Exception;
boolean deleteReference(long messageID, StorageManager storageManager)
throws Exception;
+ int deleteMatchingReferences(Filter filter, StorageManager storageManager)
+ throws Exception;
+
boolean expireMessage(long messageID, StorageManager storageManager,
PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository)
throws Exception;
+ /**
+ * Flagged all the messages in the queue which matches the filter as <em>expired</em>
+ */
+ int expireMessages(Filter filter,
+ StorageManager storageManager,
+ PostOffice postOffice,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+
void expireMessages(final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
@@ -132,9 +142,11 @@
HierarchicalRepository<QueueSettings> queueSettingsRepository)
throws Exception;
- boolean moveMessage(long messageID, Binding toBinding,
+ boolean moveMessage(long messageID, SimpleString toAddress,
StorageManager storageManager, PostOffice postOffice) throws Exception;
+ int moveMessages(Filter filter, SimpleString toAddress, StorageManager storageManager, PostOffice postOffice) throws Exception;
+
void setBackup();
boolean activate();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -220,12 +220,51 @@
}
}
+
+ public void expire(final Transaction tx,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+ {
+ SimpleString expiryAddress = queueSettingsRepository.getMatch(queue.getName().toString()).getExpiryAddress();
- public void move(final Binding otherBinding, final StorageManager persistenceManager, final PostOffice postOffice) throws Exception
+ if (expiryAddress != null)
+ {
+ List<Binding> bindingList = postOffice.getBindingsForAddress(expiryAddress);
+
+ if (bindingList.isEmpty())
+ {
+ log.warn("Message has expired. No bindings for Expiry Address " + expiryAddress + " so dropping it");
+ }
+ else
+ {
+ move(expiryAddress, tx, storageManager, true);
+ }
+ }
+ else
+ {
+ log.warn("Message has expired. No expiry queue configured for queue " + queue.getName() + " so dropping it");
+
+ tx.addAcknowledgement(this);
+ }
+ }
+
+ public void move(final SimpleString toAddress, final StorageManager persistenceManager, final PostOffice postOffice) throws Exception
{
- move(otherBinding, persistenceManager, postOffice, false);
+ move(toAddress, persistenceManager, postOffice, false);
}
+
+ public void move(final SimpleString toAddress, final Transaction tx, final StorageManager persistenceManager, final boolean expiry) throws Exception
+ {
+ ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
+ copyMessage.setDestination(toAddress);
+
+ tx.addMessage(copyMessage);
+
+ tx.addAcknowledgement(this);
+ }
+
// Public --------------------------------------------------------
public String toString()
@@ -241,24 +280,6 @@
// Private -------------------------------------------------------
- private void move(final Binding otherBinding,
- final StorageManager persistenceManager,
- final PostOffice postOffice,
- final boolean expiry) throws Exception
- {
- Transaction tx = new TransactionImpl(persistenceManager, postOffice);
-
- ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
-
- copyMessage.setDestination(otherBinding.getAddress());
-
- tx.addMessage(copyMessage);
-
- tx.addAcknowledgement(this);
-
- tx.commit();
- }
-
private void move(final SimpleString address,
final StorageManager persistenceManager,
final PostOffice postOffice,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -28,7 +28,6 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.DistributionPolicy;
@@ -41,8 +40,8 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.messaging.util.SimpleString;
-import org.jboss.messaging.util.ConcurrentHashSet;
/**
* Implementation of a Queue TODO use Java 5 concurrent queue
@@ -403,8 +402,15 @@
return messagesAdded.get();
}
- public synchronized void deleteAllReferences(final StorageManager storageManager) throws Exception
+ public synchronized int deleteAllReferences(final StorageManager storageManager) throws Exception
{
+ return deleteMatchingReferences(null, storageManager);
+ }
+
+ public synchronized int deleteMatchingReferences(final Filter filter, final StorageManager storageManager) throws Exception
+ {
+ int count = 0;
+
Transaction tx = new TransactionImpl(storageManager, postOffice);
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -413,22 +419,29 @@
{
MessageReference ref = iter.next();
- deliveringCount.incrementAndGet();
-
- tx.addAcknowledgement(ref);
-
- iter.remove();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ tx.addAcknowledgement(ref);
+ iter.remove();
+ count++;
+ }
}
-
+
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
for (MessageReference messageReference : cancelled)
{
- deliveringCount.incrementAndGet();
-
- tx.addAcknowledgement(messageReference);
+ if (filter == null || filter.match(messageReference.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ tx.addAcknowledgement(messageReference);
+ count++;
+ }
}
tx.commit();
+
+ return count;
}
public synchronized boolean deleteReference(final long messageID, final StorageManager storageManager) throws Exception
@@ -478,6 +491,32 @@
return false;
}
+ public int expireMessages(final Filter filter,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
+ {
+ Transaction tx = new TransactionImpl(storageManager, postOffice);
+
+ int count = 0;
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ ref.expire(tx, storageManager, postOffice, queueSettingsRepository);
+ iter.remove();
+ count++;
+ }
+ }
+
+ tx.commit();
+
+ return count;
+ }
public void expireMessages(final StorageManager storageManager,
final PostOffice postOffice,
@@ -522,7 +561,7 @@
}
public boolean moveMessage(final long messageID,
- final Binding toBinding,
+ final SimpleString toAddress,
final StorageManager storageManager,
final PostOffice postOffice) throws Exception
{
@@ -534,14 +573,50 @@
if (ref.getMessage().getMessageID() == messageID)
{
deliveringCount.incrementAndGet();
- ref.move(toBinding, storageManager, postOffice);
+ ref.move(toAddress, storageManager, postOffice);
iter.remove();
return true;
}
}
return false;
}
+
+ public synchronized int moveMessages(final Filter filter, final SimpleString toAddress, final StorageManager storageManager, final PostOffice postOffice) throws Exception
+ {
+ Transaction tx = new TransactionImpl(storageManager, postOffice);
+ int count = 0;
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ ref.move(toAddress, tx, storageManager, false);
+ iter.remove();
+ count++;
+ }
+ }
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference ref : cancelled)
+ {
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ ref.move(toAddress, tx, storageManager, false);
+ tx.addAcknowledgement(ref);
+ count++;
+ }
+ }
+
+ tx.commit();
+
+ return count;
+ }
+
public boolean changeMessagePriority(final long messageID,
final byte newPriority,
final StorageManager storageManager,
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/DestinationControlMBean.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -22,6 +22,10 @@
package org.jboss.messaging.jms.server.management;
+import static javax.management.MBeanOperationInfo.ACTION;
+
+import org.jboss.messaging.core.management.Operation;
+
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -44,6 +48,7 @@
// Operations ----------------------------------------------------
- public void removeAllMessages() throws Exception;
+ @Operation(desc = "Remove all the messages from the destination", impact = ACTION)
+ int removeAllMessages() throws Exception;
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -77,9 +77,6 @@
@Parameter(name = "filter", desc = "A JMS Message filter") String filter)
throws Exception;
- @Operation(desc = "Remove all the messages from the queue", impact = ACTION)
- void removeAllMessages() throws Exception;
-
@Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
boolean removeMessage(
@Parameter(name = "messageID", desc = "A message ID") String messageID)
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -36,6 +36,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.DayCounterInfo;
import org.jboss.messaging.core.management.MessageCounterInfo;
import org.jboss.messaging.core.management.impl.MBeanInfoHelper;
@@ -50,7 +51,6 @@
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.jms.JBossDestination;
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.client.JBossMessage;
@@ -84,6 +84,12 @@
// Static --------------------------------------------------------
+ public static Filter createFilterFromJMSSelector(final String selectorStr) throws MessagingException
+ {
+ String filterStr = (selectorStr == null) ? null : SelectorTranslator.convertToJBMFilterString(selectorStr);
+ return FilterImpl.createFilter(filterStr);
+ }
+
private static Filter createFilterForJMSMessageID(String jmsMessageID)
throws Exception
{
@@ -227,25 +233,17 @@
{
try
{
- Filter filter = filterStr == null ? null : new FilterImpl(
- new SimpleString(SelectorTranslator
- .convertToJBMFilterString(filterStr)));
-
- List<MessageReference> refs = coreQueue.list(filter);
- for (MessageReference ref : refs)
- {
- coreQueue.deleteReference(ref.getMessage().getMessageID(), storageManager);
- }
- return refs.size();
+ Filter filter = createFilterFromJMSSelector(filterStr);
+ return coreQueue.deleteMatchingReferences(filter, storageManager);
} catch (MessagingException e)
{
throw new IllegalStateException(e.getMessage());
}
}
- public void removeAllMessages() throws Exception
+ public int removeAllMessages() throws Exception
{
- coreQueue.deleteAllReferences(storageManager);
+ return coreQueue.deleteAllReferences(storageManager);
}
public TabularData listAllMessages() throws Exception
@@ -257,9 +255,7 @@
{
try
{
- Filter filter = filterStr == null ? null : new FilterImpl(
- new SimpleString(SelectorTranslator
- .convertToJBMFilterString(filterStr)));
+ Filter filter = createFilterFromJMSSelector(filterStr);
List<MessageReference> messageRefs = coreQueue.list(filter);
List<JMSMessageInfo> infos = new ArrayList<JMSMessageInfo>(messageRefs
@@ -294,9 +290,7 @@
{
try
{
- Filter filter = filterStr == null ? null : new FilterImpl(
- new SimpleString(SelectorTranslator
- .convertToJBMFilterString(filterStr)));
+ Filter filter = createFilterFromJMSSelector(filterStr);
List<MessageReference> refs = coreQueue.list(filter);
for (MessageReference ref : refs)
@@ -354,21 +348,20 @@
+ otherQueueName);
}
- return coreQueue.moveMessage(messageID, binding, storageManager, postOffice);
+ return coreQueue.moveMessage(messageID, binding.getAddress(), storageManager, postOffice);
}
public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
{
- Filter filter = filterStr == null ? null : new FilterImpl(new SimpleString(filterStr));
- List<MessageReference> refs = coreQueue.list(filter);
- synchronized (coreQueue)
+ Binding otherBinding = postOffice.getBinding(new SimpleString(otherQueueName));
+ if (otherBinding == null)
{
- for (MessageReference ref : refs)
- {
- moveMessage(ref.getMessage().getMessageID(), otherQueueName);
- }
- return refs.size();
+ throw new IllegalArgumentException("No queue found for "
+ + otherQueueName);
}
+
+ Filter filter = createFilterFromJMSSelector(filterStr);
+ return coreQueue.moveMessages(filter, otherBinding.getAddress(), storageManager, postOffice);
}
public int moveAllMessages(String otherQueueName) throws Exception
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/TopicControl.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -207,16 +207,19 @@
return JMSMessageInfo.toTabularData(infos);
}
- public void removeAllMessages() throws Exception
+ public int removeAllMessages() throws Exception
{
+ int count = 0;
List<Binding> bindings = postOffice.getBindingsForAddress(managedTopic
.getSimpleAddress());
for (Binding binding : bindings)
{
Queue queue = binding.getQueue();
- queue.deleteAllReferences(storageManager);
+ count += queue.deleteAllReferences(storageManager);
}
+
+ return count;
}
public void dropDurableSubscription(String clientID, String subscriptionName) throws Exception
Added: trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/QueueControlTest.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -0,0 +1,343 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.management;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+
+import java.lang.management.ManagementFactory;
+
+import javax.management.MBeanServerInvocationHandler;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A QueueControlTest
+ *
+ * @author jmesnil
+ *
+ * Created 26 nov. 2008 14:18:48
+ *
+ *
+ */
+public class QueueControlTest extends TestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService service;
+
+ // Static --------------------------------------------------------
+
+ private static QueueControlMBean createQueueControl(SimpleString address, SimpleString name) throws Exception
+ {
+ QueueControlMBean queueControl = (QueueControlMBean)MBeanServerInvocationHandler.newProxyInstance(ManagementFactory.getPlatformMBeanServer(),
+ ManagementServiceImpl.getQueueObjectName(address,
+ name),
+ QueueControlMBean.class,
+ false);
+ return queueControl;
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /**
+ * <ol>
+ * <li>send a message to queue</li>
+ * <li>move all messages from queue to otherQueue using management method</li>
+ * <li>check there is no message to consume from queue</li>
+ * <li>consume the message from otherQueue</li>
+ * </ol>
+ */
+ public void testMoveAllMessages() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSession session = sf.createSession(false, true, true);
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString otherAddress = randomSimpleString();
+ SimpleString otherQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true, true);
+ session.createQueue(otherAddress, otherQueue, null, false, true, true);
+ ClientProducer producer = session.createProducer(address);
+ session.start();
+
+ // send on queue
+ ClientMessage message = session.createClientMessage(false);
+ SimpleString key = randomSimpleString();
+ long value = randomLong();
+ message.putLongProperty(key, value);
+ producer.send(message);
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(100);
+ QueueControlMBean queueControl = createQueueControl(address, queue);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // moved all messages to otherQueue
+ int movedMessagesCount = queueControl.moveAllMessages(otherQueue.toString());
+ assertEquals(1, movedMessagesCount);
+ assertEquals(0, queueControl.getMessageCount());
+
+ // check there is no message to consume from queue
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNull(m);
+
+ // consume the message from otherQueue
+ ClientConsumer otherConsumer = session.createConsumer(otherQueue);
+ m = otherConsumer.receive(500);
+ assertEquals(value, m.getProperty(key));
+
+ m.acknowledge();
+
+ consumer.close();
+ session.deleteQueue(queue);
+ otherConsumer.close();
+ session.deleteQueue(otherQueue);
+ session.close();
+ }
+
+ /**
+ * <ol>
+ * <li>send 2 message to queue</li>
+ * <li>move messages from queue to otherQueue using management method <em>with filter</em></li>
+ * <li>consume the message which <strong>did not</strong> matches the filter from queue</li>
+ * <li>consume the message which <strong>did</strong> matches the filter from otherQueue</li>
+ * </ol>
+ */
+
+ public void testMoveMatchingMessages() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSession session = sf.createSession(false, true, true);
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ SimpleString otherAddress = randomSimpleString();
+ SimpleString otherQueue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true, true);
+ session.createQueue(otherAddress, otherQueue, null, false, true, true);
+ ClientProducer producer = session.createProducer(address);
+ session.start();
+
+ // send on queue
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ producer.send(matchingMessage);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(unmatchingMessage);
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(100);
+ QueueControlMBean queueControl = createQueueControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // moved matching messages to otherQueue
+ int movedMatchedMessagesCount = queueControl.moveMatchingMessages(key + " =" + matchingValue, otherQueue.toString());
+ assertEquals(1, movedMatchedMessagesCount);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // consume the unmatched message from queue
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertEquals(unmatchingValue, m.getProperty(key));
+
+ // consume the matched message from otherQueue
+ ClientConsumer otherConsumer = session.createConsumer(otherQueue);
+ m = otherConsumer.receive(500);
+ assertNotNull(m);
+ assertEquals(matchingValue, m.getProperty(key));
+
+ m.acknowledge();
+
+ consumer.close();
+ session.deleteQueue(queue);
+ otherConsumer.close();
+ session.deleteQueue(otherQueue);
+ session.close();
+ }
+
+ /**
+ * <ol>
+ * <li>send 2 messages to queue</li>
+ * <li>remove all messages using management method</li>
+ * <li>check there is no message to consume from queue</li>
+ * <li>consume the message from otherQueue</li>
+ * </ol>
+ */
+ public void testRemoveAllMessages() throws Exception
+ {
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSession session = sf.createSession(false, true, true);
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true, true);
+ ClientProducer producer = session.createProducer(address);
+ session.start();
+
+ // send 2 messages on queue
+ producer.send(session.createClientMessage(false));
+ producer.send(session.createClientMessage(false));
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(100);
+ QueueControlMBean queueControl = createQueueControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // delete all messages
+ int deletedMessagesCount = queueControl.removeAllMessages();
+ assertEquals(2, deletedMessagesCount);
+ assertEquals(0, queueControl.getMessageCount());
+
+ // check there is no message to consume from queue
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNull(m);
+
+ consumer.close();
+ session.deleteQueue(queue);
+ session.close();
+ }
+
+ /**
+ * <ol>
+ * <li>send 2 message to queue</li>
+ * <li>remove messages from queue using management method <em>with filter</em></li>
+ * <li>check there is only one message to consume from queue</li>
+ * </ol>
+ */
+
+ public void testRemoveMatchingMessages() throws Exception
+ {
+ SimpleString key = new SimpleString("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSession session = sf.createSession(false, true, true);
+
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, null, false, true, true);
+ ClientProducer producer = session.createProducer(address);
+ session.start();
+
+ // send on queue
+ ClientMessage matchingMessage = session.createClientMessage(false);
+ matchingMessage.putLongProperty(key, matchingValue);
+ producer.send(matchingMessage);
+ ClientMessage unmatchingMessage = session.createClientMessage(false);
+ unmatchingMessage.putLongProperty(key, unmatchingValue);
+ producer.send(unmatchingMessage);
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(100);
+ QueueControlMBean queueControl = createQueueControl(address, queue);
+ assertEquals(2, queueControl.getMessageCount());
+
+ // removed matching messages to otherQueue
+ int removedMatchedMessagesCount = queueControl.removeMatchingMessages(key + " =" + matchingValue);
+ assertEquals(1, removedMatchedMessagesCount);
+ assertEquals(1, queueControl.getMessageCount());
+
+ // consume the unmatched message from queue
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertEquals(unmatchingValue, m.getProperty(key));
+
+ m.acknowledge();
+
+ // check there is no other message to consume:
+ m = consumer.receive(500);
+ assertNull(m);
+
+
+ consumer.close();
+ session.deleteQueue(queue);
+ session.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+
+ Configuration conf = new ConfigurationImpl();
+ conf.setSecurityEnabled(false);
+ conf.setJMXManagementEnabled(true);
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ service = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+ service.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ service.stop();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -31,6 +31,7 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomBoolean;
import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import static org.jboss.messaging.tests.util.RandomUtil.randomXid;
@@ -470,7 +471,7 @@
expect(queue.getName()).andReturn(new SimpleString(name));
expect(binding.getQueue()).andReturn(queue);
expect(postOffice.getBinding(new SimpleString(name))).andReturn(binding);
- queue.deleteAllReferences(storageManager);
+ expect(queue.deleteAllReferences(storageManager)).andReturn(randomPositiveInt());
expect(postOffice.removeBinding(new SimpleString(name))).andReturn(
binding);
replayMockedAttributes();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.unit.core.management.impl;
import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.isA;
@@ -32,6 +33,7 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomByte;
import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
@@ -318,12 +320,13 @@
public void testRemoveAllMessages() throws Exception
{
- queue.deleteAllReferences(storageManager);
+ int messageRemoved = randomPositiveInt();
+ expect(queue.deleteAllReferences(storageManager)).andReturn(messageRemoved);
replayMockedAttributes();
QueueControlMBean control = createControl();
- control.removeAllMessages();
+ assertEquals(messageRemoved,control.removeAllMessages());
verifyMockedAttributes();
}
@@ -458,46 +461,27 @@
public void testExpireMessagesWithFilter() throws Exception
{
- long messageID_1 = randomLong();
- long messageID_2 = randomLong();
+ int expiredMessagesCount = randomPositiveInt();
- List<MessageReference> refs = new ArrayList<MessageReference>();
- MessageReference ref_1 = createMock(MessageReference.class);
- ServerMessage message_1 = createMock(ServerMessage.class);
- expect(message_1.getMessageID()).andStubReturn(messageID_1);
- expect(ref_1.getMessage()).andReturn(message_1);
- MessageReference ref_2 = createMock(MessageReference.class);
- ServerMessage message_2 = createMock(ServerMessage.class);
- expect(message_2.getMessageID()).andStubReturn(messageID_2);
- expect(ref_2.getMessage()).andReturn(message_2);
- refs.add(ref_1);
- refs.add(ref_2);
- expect(queue.list(isA(Filter.class))).andReturn(refs);
- expect(
- queue.expireMessage(messageID_1, storageManager, postOffice,
- repository)).andReturn(true);
- expect(
- queue.expireMessage(messageID_2, storageManager, postOffice,
- repository)).andReturn(true);
-
+ expect(queue.expireMessages(isA(Filter.class), eq(storageManager), eq(postOffice), eq(repository))).andReturn(expiredMessagesCount);
replayMockedAttributes();
- replay(ref_1, ref_2, message_1, message_2);
QueueControlMBean control = createControl();
- assertEquals(2, control.expireMessages("foo = true"));
+ assertEquals(expiredMessagesCount, control.expireMessages("foo = true"));
verifyMockedAttributes();
- verify(ref_1, ref_2, message_1, message_2);
}
public void testMoveMessage() throws Exception
{
long messageID = randomLong();
SimpleString otherQueueName = randomSimpleString();
+ SimpleString otherAddress = randomSimpleString();
Binding otherBinding = createMock(Binding.class);
+ expect(otherBinding.getAddress()).andReturn(otherAddress);
expect(postOffice.getBinding(otherQueueName)).andReturn(otherBinding);
expect(
- queue.moveMessage(messageID, otherBinding, storageManager,
+ queue.moveMessage(messageID, otherAddress, storageManager,
postOffice)).andReturn(true);
replayMockedAttributes();
@@ -534,10 +518,12 @@
{
long messageID = randomLong();
SimpleString otherQueueName = randomSimpleString();
+ SimpleString otherAddress = randomSimpleString();
Binding otherBinding = createMock(Binding.class);
+ expect(otherBinding.getAddress()).andReturn(otherAddress);
expect(postOffice.getBinding(otherQueueName)).andReturn(otherBinding);
expect(
- queue.moveMessage(messageID, otherBinding, storageManager,
+ queue.moveMessage(messageID, otherAddress, storageManager,
postOffice)).andReturn(false);
replayMockedAttributes();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -337,7 +337,7 @@
EasyMock.replay(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage, pm);
- messageReference.move(toBinding, persistenceManager, postOffice);
+ messageReference.move(toAddress, persistenceManager, postOffice);
EasyMock.verify(queue, toBinding, toQueue, postOffice, persistenceManager, serverMessage, copyMessage, pm);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -1410,7 +1410,7 @@
assertEquals(0, queue.getDeliveringCount());
assertTrue(queue.getSizeBytes() > 0);
- queue.moveMessage(messageID, toBinding, storageManager, postOffice);
+ queue.moveMessage(messageID, toQueueName, storageManager, postOffice);
assertEquals(0, queue.getMessageCount());
assertEquals(0, queue.getDeliveringCount());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -31,6 +31,7 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomByte;
import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
@@ -56,6 +57,7 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.jms.JBossQueue;
import org.jboss.messaging.jms.server.management.impl.JMSQueueControl;
+import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.util.SimpleString;
/**
@@ -301,12 +303,13 @@
public void testRemoveAllMessages() throws Exception
{
- coreQueue.deleteAllReferences(storageManager);
+ int removedMessagesCount = randomPositiveInt();
+ expect(coreQueue.deleteAllReferences(storageManager)).andReturn(removedMessagesCount);
replayMockedAttributes();
JMSQueueControl control = createControl();
- control.removeAllMessages();
+ assertEquals(removedMessagesCount, control.removeAllMessages());
verifyMockedAttributes();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/TopicControlTest.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -27,6 +27,7 @@
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jboss.messaging.tests.util.RandomUtil.randomInt;
+import static org.jboss.messaging.tests.util.RandomUtil.randomPositiveInt;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import java.util.ArrayList;
@@ -45,6 +46,7 @@
import org.jboss.messaging.jms.JBossTopic;
import org.jboss.messaging.jms.server.management.SubscriptionInfo;
import org.jboss.messaging.jms.server.management.impl.TopicControl;
+import org.jboss.messaging.tests.util.RandomUtil;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -244,6 +246,8 @@
{
String jndiBinding = randomString();
String name = randomString();
+ int removedMessagesFromQueue1 = randomPositiveInt();
+ int removedMessagesFromQueue2 = randomPositiveInt();
JBossTopic topic = new JBossTopic(name);
PostOffice postOffice = createMock(PostOffice.class);
@@ -262,15 +266,15 @@
bindings.add(bindingForQueue_2);
expect(postOffice.getBindingsForAddress(topic.getSimpleAddress()))
.andStubReturn(bindings);
- queue_1.deleteAllReferences(storageManager);
- queue_2.deleteAllReferences(storageManager);
+ expect(queue_1.deleteAllReferences(storageManager)).andReturn(removedMessagesFromQueue1);
+ expect(queue_2.deleteAllReferences(storageManager)).andReturn(removedMessagesFromQueue2);
replay(postOffice, storageManager, bindingforQueue_1, queue_1,
bindingForQueue_2, queue_2);
TopicControl control = new TopicControl(topic, jndiBinding, postOffice,
storageManager);
- control.removeAllMessages();
+ assertEquals(removedMessagesFromQueue1 + removedMessagesFromQueue2, control.removeAllMessages());
verify(postOffice, storageManager, bindingforQueue_1, queue_1,
bindingForQueue_2, queue_2);
Modified: trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java 2008-11-27 05:29:02 UTC (rev 5438)
+++ trunk/tests/src/org/jboss/messaging/tests/util/RandomUtil.java 2008-11-27 15:59:16 UTC (rev 5439)
@@ -74,8 +74,7 @@
public static int randomPositiveInt()
{
- final int value = randomInt();
- return value >= 0 ? value : value * -1;
+ return Math.abs(randomInt());
}
public static short randomShort()
More information about the jboss-cvs-commits
mailing list