[jboss-cvs] JBoss Messaging SVN: r5490 - in trunk: src/main/org/jboss/messaging/core/management/impl and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 9 11:49:15 EST 2008
Author: jmesnil
Date: 2008-12-09 11:49:14 -0500 (Tue, 09 Dec 2008)
New Revision: 5490
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/
trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
Log:
renamed management methods dealing with expiryAddress/deadLetterAddress (instead of expiryQueue/deadLetterQueue)
integration tests for JMSQueue management in a replicated environment
Modified: trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -62,10 +62,18 @@
int getMessagesAdded();
- String getExpiryQueue();
+ String getExpiryAddress();
+ void setExpiryAddress(@Parameter(name = "expiryAddres", desc = "Expiry address of the queue")
+ String expiryAddres)
+ throws Exception;
+
String getDeadLetterAddress();
+ void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc = "Dead-letter address of the queue")
+ String deadLetterAddress)
+ throws Exception;
+
boolean isBackup();
// Operations ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -156,15 +156,48 @@
public String getDeadLetterAddress()
{
- return queueSettingsRepository.getMatch(getName()).getDeadLetterAddress().toString();
+ QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
+ if (queueSettings != null && queueSettings.getDeadLetterAddress() != null)
+ {
+ return queueSettings.getDeadLetterAddress().toString();
+ } else
+ {
+ return null;
+ }
}
- public String getExpiryQueue()
+ public void setDeadLetterAddress(String deadLetterAddress) throws Exception
{
- return queueSettingsRepository.getMatch(getName()).getExpiryAddress()
- .toString();
+ QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
+
+ if (deadLetterAddress != null)
+ {
+ queueSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+ }
}
+
+ public String getExpiryAddress()
+ {
+ QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
+ if (queueSettings != null && queueSettings.getExpiryAddress() != null)
+ {
+ return queueSettings.getExpiryAddress().toString();
+ } else
+ {
+ return null;
+ }
+ }
+
+ public void setExpiryAddress(String expiryAddres) throws Exception
+ {
+ QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
+ if (expiryAddres != null)
+ {
+ queueSettings.setExpiryAddress(new SimpleString(expiryAddres));
+ }
+ }
+
public TabularData listAllMessages() throws Exception
{
return listMessages(null);
Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -69,15 +69,25 @@
return localQueueControl.getDeadLetterAddress();
}
+ public void setDeadLetterAddress(String deadLetterAddress) throws Exception
+ {
+ replicationAwareInvoke("setDeadLetterAddress", deadLetterAddress);
+ }
+
public int getDeliveringCount()
{
return localQueueControl.getDeliveringCount();
}
- public String getExpiryQueue()
+ public String getExpiryAddress()
{
- return localQueueControl.getExpiryQueue();
+ return localQueueControl.getExpiryAddress();
}
+
+ public void setExpiryAddress(String expiryAddres) throws Exception
+ {
+ replicationAwareInvoke("setExpiryAddress", expiryAddres);
+ }
public String getFilter()
{
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSQueueControlMBean.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -43,12 +43,14 @@
String getName();
- String getExpiryQueue();
+ String getExpiryAddress();
- void setExpiryAddress(@Parameter(name = "expiryQueue", desc = "Name of the expiry queueur") String expiryQueue) throws Exception;
+ void setExpiryAddress(@Parameter(name = "expiryAddress", desc = "Expiry address of the queue") String expiryAddress) throws Exception;
String getDeadLetterAddress();
+ void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc = "Dead-letter address of the queue") String deadLetterAddress) throws Exception;
+
int getMessagesAdded();
boolean isClustered();
@@ -110,7 +112,7 @@
@Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = ACTION)
boolean moveMessage(
- @Parameter(name = "messageID", desc = "A message ID") long messageID,
+ @Parameter(name = "messageID", desc = "A message ID") String messageID,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName)
throws Exception;
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSQueueControl.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -184,20 +184,30 @@
QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
if (queueSettings != null && queueSettings.getDeadLetterAddress() != null)
{
- return JBossDestination.fromAddress(queueSettings.getDeadLetterAddress().toString()).getName();
+ return queueSettings.getDeadLetterAddress().toString();
}
else
{
return null;
}
}
+
+ public void setDeadLetterAddress(String deadLetterAddress) throws Exception
+ {
+ QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
- public String getExpiryQueue()
+ if (deadLetterAddress != null)
+ {
+ queueSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+ }
+ }
+
+ public String getExpiryAddress()
{
QueueSettings queueSettings = queueSettingsRepository.getMatch(getName());
if (queueSettings != null && queueSettings.getExpiryAddress() != null)
{
- return JBossDestination.fromAddress(queueSettings.getExpiryAddress().toString()).getName();
+ return queueSettings.getExpiryAddress().toString();
}
else
{
@@ -332,15 +342,21 @@
queueSettingsRepository);
}
- public boolean moveMessage(long messageID, String otherQueueName) throws Exception
+ public boolean moveMessage(String messageID, String otherQueueName) throws Exception
{
Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
if (binding == null)
{
throw new IllegalArgumentException("No queue found for " + otherQueueName);
}
+ Filter filter = createFilterForJMSMessageID(messageID);
+ List<MessageReference> refs = coreQueue.list(filter);
+ if (refs.size() != 1)
+ {
+ throw new IllegalArgumentException("No message found for JMSMessageID: " + messageID);
+ }
- return coreQueue.moveMessage(messageID, binding.getAddress(), storageManager, postOffice);
+ return coreQueue.moveMessage(refs.get(0).getMessage().getMessageID(), binding.getAddress(), storageManager, postOffice);
}
public int moveMatchingMessages(String filterStr, String otherQueueName) throws Exception
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSQueueControlWrapper.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -68,15 +68,20 @@
{
return localControl.getDeadLetterAddress();
}
+
+ public void setDeadLetterAddress(String deadLetterAddress) throws Exception
+ {
+ replicationAwareInvoke("setDeadLetterAddress", deadLetterAddress);
+ }
public int getDeliveringCount()
{
return localControl.getDeliveringCount();
}
- public String getExpiryQueue()
+ public String getExpiryAddress()
{
- return localControl.getExpiryQueue();
+ return localControl.getExpiryAddress();
}
public int getMessageCount()
@@ -184,7 +189,7 @@
return (Integer)replicationAwareInvoke("moveMatchingMessages", filter, otherQueueName);
}
- public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
+ public boolean moveMessage(final String messageID, final String otherQueueName) throws Exception
{
return (Boolean)replicationAwareInvoke("moveMessage", messageID, otherQueueName);
}
@@ -204,9 +209,9 @@
return (Boolean)replicationAwareInvoke("sendMessageToDLQ", messageID);
}
- public void setExpiryAddress(final String expiryQueue) throws Exception
+ public void setExpiryAddress(final String expiryAddress) throws Exception
{
- replicationAwareInvoke("setExpiryAddress", expiryQueue);
+ replicationAwareInvoke("setExpiryAddress", expiryAddress);
}
public int removeAllMessages() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareQueueControlWrapperTest.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -24,6 +24,7 @@
import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
import static org.jboss.messaging.tests.util.RandomUtil.randomSimpleString;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
@@ -413,7 +414,7 @@
assertEquals(0, backupQueueControl.getMessageCount());
}
- public void testSendMessageToDLQ() throws Exception
+ public void testSendMessageToDeadLetterAddress() throws Exception
{
QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
@@ -444,6 +445,44 @@
assertEquals(0, backupQueueControl.getMessageCount());
}
+ public void testSetDeadLetterAddress() throws Exception
+ {
+ String deadLetterAddress = randomString();
+
+ QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+ QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+ assertFalse(liveQueueControl.isBackup());
+ assertTrue(backupQueueControl.isBackup());
+
+ assertNull(liveQueueControl.getDeadLetterAddress());
+ assertNull(backupQueueControl.getDeadLetterAddress());
+
+ liveQueueControl.setDeadLetterAddress(deadLetterAddress);
+
+ assertEquals(deadLetterAddress, liveQueueControl.getDeadLetterAddress());
+ assertEquals(deadLetterAddress, backupQueueControl.getDeadLetterAddress());
+ }
+
+ public void testSetExpiryAddress() throws Exception
+ {
+ String expiryAddress = randomString();
+
+ QueueControlMBean liveQueueControl = createQueueControl(address, address, liveMBeanServer);
+ QueueControlMBean backupQueueControl = createQueueControl(address, address, backupMBeanServer);
+
+ assertFalse(liveQueueControl.isBackup());
+ assertTrue(backupQueueControl.isBackup());
+
+ assertNull(liveQueueControl.getExpiryAddress());
+ assertNull(backupQueueControl.getExpiryAddress());
+
+ liveQueueControl.setExpiryAddress(expiryAddress);
+
+ assertEquals(expiryAddress, liveQueueControl.getExpiryAddress());
+ assertEquals(expiryAddress, backupQueueControl.getExpiryAddress());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/management/ReplicationAwareTestBase.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -129,6 +129,10 @@
protected void setUp() throws Exception
{
backupMBeanServer = MBeanServerFactory.createMBeanServer();
+ liveMBeanServer = MBeanServerFactory.createMBeanServer();
+
+ assertTrue(backupMBeanServer != liveMBeanServer);
+
Configuration backupConf = new ConfigurationImpl();
backupConf.setSecurityEnabled(false);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -139,7 +143,6 @@
backupService = createNullStorageMessagingServer(backupConf, backupMBeanServer);
backupService.start();
- liveMBeanServer = MBeanServerFactory.createMBeanServer();
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
Added: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/management/ReplicationAwareJMSQueueControlWrapperTest.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -0,0 +1,412 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.jms.cluster.management;
+
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.openmbean.TabularData;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.management.MessageInfo;
+import org.jboss.messaging.core.management.QueueControlMBean;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.server.impl.JMSServerManagerImpl;
+import org.jboss.messaging.jms.server.management.JMSQueueControlMBean;
+import org.jboss.messaging.jms.server.management.impl.JMSManagementServiceImpl;
+import org.jboss.messaging.tests.integration.cluster.management.ReplicationAwareTestBase;
+import org.jboss.messaging.tests.integration.jms.management.JMSUtil;
+import org.jboss.messaging.tests.integration.jms.management.NullInitialContext;
+
+/**
+ * A ReplicationAwareQueueControlWrapperTest
+ *
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class ReplicationAwareJMSQueueControlWrapperTest extends ReplicationAwareTestBase
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final long timeToSleep = 100;
+
+ private JMSServerManagerImpl liveServerManager;
+
+ private JMSServerManagerImpl backupServerManager;
+
+ private JBossQueue queue;
+
+ private JBossQueue otherQueue;
+
+ private Session session;
+
+ private JMSQueueControlMBean liveQueueControl;
+
+ private JMSQueueControlMBean backupQueueControl;
+
+ private JMSQueueControlMBean liveOtherQueueControl;
+
+ private JMSQueueControlMBean backupOtherQueueControl;
+
+
+ // Static --------------------------------------------------------
+
+ private static JMSQueueControlMBean createQueueControl(String name, MBeanServer mbeanServer) throws Exception
+ {
+ JMSQueueControlMBean queueControl = (JMSQueueControlMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
+ JMSManagementServiceImpl.getJMSQueueObjectName(name),
+ JMSQueueControlMBean.class,
+ false);
+ return queueControl;
+ }
+
+ private static Message sendMessageWithProperty(Session session, Destination destination, String key, long value) throws JMSException
+ {
+ MessageProducer producer = session.createProducer(destination);
+ Message message = session.createMessage();
+ message.setLongProperty(key, value);
+ producer.send(message);
+ return message;
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testChangeMessagePriority() throws Exception
+ {
+ byte oldPriority = (byte)1;
+ byte newPriority = (byte)8;
+
+ // send 1 message
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(randomString());
+ message.setJMSPriority(oldPriority);
+ producer.send(message);
+
+ // wiat a little bit to give time for the message to be handled by the server
+ Thread.sleep(timeToSleep);
+
+ // check it is on both live & backup nodes
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+
+ assertTrue(liveQueueControl.changeMessagePriority(message.getJMSMessageID(), newPriority));
+ }
+
+ public void testExpireMessage() throws Exception
+ {
+ // send 1 message
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(randomString());
+ producer.send(message);
+
+ // wiat a little bit to give time for the message to be handled by the server
+ Thread.sleep(timeToSleep);
+
+ // check it is on both live & backup nodes
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+
+ assertTrue(liveQueueControl.expireMessage(message.getJMSMessageID()));
+
+ // check it is on both live & backup nodes
+ assertEquals(0, liveQueueControl.getMessageCount());
+ assertEquals(0, backupQueueControl.getMessageCount());
+ }
+
+ public void testExpireMessagesWithFilter() throws Exception
+ {
+ String key = "key";
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ // send 1 message
+ sendMessageWithProperty(session, queue, key, unmatchingValue);
+ sendMessageWithProperty(session, queue, key, matchingValue);
+
+ // wiat a little bit to give time for the message to be handled by the server
+ Thread.sleep(timeToSleep);
+
+ // check messages are on both live & backup nodes
+ assertEquals(2, liveQueueControl.getMessageCount());
+ assertEquals(2, backupQueueControl.getMessageCount());
+
+ assertEquals(1, liveQueueControl.expireMessages(key + " =" + matchingValue));
+
+ // check there is only 1 message in the queue on both live & backup nodes
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+ }
+
+ public void testMoveAllMessages() throws Exception
+ {
+ // send on queue
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createMessage());
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(timeToSleep);
+
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+ assertEquals(0, liveOtherQueueControl.getMessageCount());
+ assertEquals(0, backupOtherQueueControl.getMessageCount());
+
+ // moved all messages to otherQueue
+ int movedMessagesCount = liveQueueControl.moveAllMessages(otherQueue.getAddress());
+ assertEquals(1, movedMessagesCount);
+
+ assertEquals(0, liveQueueControl.getMessageCount());
+ assertEquals(0, backupQueueControl.getMessageCount());
+ assertEquals(1, liveOtherQueueControl.getMessageCount());
+ assertEquals(1, backupOtherQueueControl.getMessageCount());
+ }
+
+ public void testMoveMatchingMessages() throws Exception
+ {
+ String key = new String("key");
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ // send on queue
+ sendMessageWithProperty(session, queue, key, unmatchingValue);
+ sendMessageWithProperty(session, queue, key, matchingValue);
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(timeToSleep);
+
+ assertEquals(2, liveQueueControl.getMessageCount());
+ assertEquals(2, backupQueueControl.getMessageCount());
+ assertEquals(0, liveOtherQueueControl.getMessageCount());
+ assertEquals(0, backupOtherQueueControl.getMessageCount());
+
+ // moved matching messages to otherQueue
+ int movedMatchedMessagesCount = liveQueueControl.moveMatchingMessages(key + " =" + matchingValue, otherQueue.getAddress());
+ assertEquals(1, movedMatchedMessagesCount);
+
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+ assertEquals(1, liveOtherQueueControl.getMessageCount());
+ assertEquals(1, backupOtherQueueControl.getMessageCount());
+ }
+
+ public void testMoveMessage() throws Exception
+ {
+ // send on queue
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ producer.send(message);
+
+ // wait a little bit to give time for the message to be handled by the server
+ Thread.sleep(timeToSleep);
+
+ // check it is on both live & backup nodes
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+ assertEquals(0, liveOtherQueueControl.getMessageCount());
+ assertEquals(0, backupOtherQueueControl.getMessageCount());
+
+ assertTrue(liveQueueControl.moveMessage(message.getJMSMessageID(), otherQueue.getAddress()));
+
+ // check the message is no longer in the queue on both live & backup nodes
+ assertEquals(0, liveQueueControl.getMessageCount());
+ assertEquals(0, backupQueueControl.getMessageCount());
+ assertEquals(1, liveOtherQueueControl.getMessageCount());
+ assertEquals(1, backupOtherQueueControl.getMessageCount());
+ }
+
+ public void testRemoveAllMessages() throws Exception
+ {
+ // send 1 message
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createMessage());
+
+ // wiat a little bit to give time for the message to be handled by the server
+ Thread.sleep(timeToSleep);
+
+ // check it is on both live & backup nodes
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+
+ // remove all messages
+ int count = liveQueueControl.removeAllMessages();
+ assertEquals(1, count);
+
+ // check there are no messages on both live & backup nodes
+ assertEquals(0, liveQueueControl.getMessageCount());
+ assertEquals(0, backupQueueControl.getMessageCount());
+ }
+
+ public void testRemoveMatchingMessages() throws Exception
+ {
+ String key = "key";
+ long matchingValue = randomLong();
+ long unmatchingValue = matchingValue + 1;
+
+ // send on queue
+ sendMessageWithProperty(session, queue, key, unmatchingValue);
+ sendMessageWithProperty(session, queue, key, matchingValue);
+
+ // wait a little bit to ensure the message is handled by the server
+ Thread.sleep(timeToSleep );
+
+ assertEquals(2, liveQueueControl.getMessageCount());
+ assertEquals(2, backupQueueControl.getMessageCount());
+
+ // removed matching messages
+ int removedMatchedMessagesCount = liveQueueControl.removeMatchingMessages(key + " =" + matchingValue);
+ assertEquals(1, removedMatchedMessagesCount);
+
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+ }
+
+ public void testRemoveMessage() throws Exception
+ {
+ // send 1 message
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ producer.send(message);
+
+ // wait a little bit to give time for the message to be handled by the server
+ Thread.sleep(timeToSleep);
+
+ // check it is on both live & backup nodes
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+
+ assertTrue(liveQueueControl.removeMessage(message.getJMSMessageID()));
+
+ // check the message is no longer in the queue on both live & backup nodes
+ assertEquals(0, liveQueueControl.getMessageCount());
+ assertEquals(0, backupQueueControl.getMessageCount());
+ }
+
+ public void testSendMessageToDeadLetterAddress() throws Exception
+ {
+ // send 1 message
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ producer.send(message);
+
+ // wait a little bit to give time for the message to be handled by the server
+ Thread.sleep(timeToSleep);
+
+ // check it is on both live & backup nodes
+ assertEquals(1, liveQueueControl.getMessageCount());
+ assertEquals(1, backupQueueControl.getMessageCount());
+
+ assertTrue(liveQueueControl.sendMessageToDLQ(message.getJMSMessageID()));
+
+ // check the message is no longer in the queue on both live & backup nodes
+ assertEquals(0, liveQueueControl.getMessageCount());
+ assertEquals(0, backupQueueControl.getMessageCount());
+ }
+
+ public void testSetDeadLetterAddress() throws Exception
+ {
+ String deadLetterAddress = randomString();
+
+ assertNull(liveQueueControl.getDeadLetterAddress());
+ assertNull(backupQueueControl.getDeadLetterAddress());
+
+ liveQueueControl.setDeadLetterAddress(deadLetterAddress);
+
+ assertEquals(deadLetterAddress, liveQueueControl.getDeadLetterAddress());
+ assertEquals(deadLetterAddress, backupQueueControl.getDeadLetterAddress());
+ }
+
+ public void testSetExpiryAddress() throws Exception
+ {
+ String expiryAddress = randomString();
+
+ assertNull(liveQueueControl.getExpiryAddress());
+ assertNull(backupQueueControl.getExpiryAddress());
+
+ liveQueueControl.setExpiryAddress(expiryAddress);
+
+ assertEquals(expiryAddress, liveQueueControl.getExpiryAddress());
+ assertEquals(expiryAddress, backupQueueControl.getExpiryAddress());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ liveServerManager = JMSServerManagerImpl.newJMSServerManagerImpl(liveService.getServer());
+ liveServerManager.start();
+ liveServerManager.setInitialContext(new NullInitialContext());
+
+ backupServerManager = JMSServerManagerImpl.newJMSServerManagerImpl(backupService.getServer());
+ backupServerManager.start();
+ backupServerManager.setInitialContext(new NullInitialContext());
+
+ String queueName = randomString();
+ liveServerManager.createQueue(queueName, queueName);
+ backupServerManager.createQueue(queueName, queueName);
+ queue = new JBossQueue(queueName);
+
+ String otherQueueName = randomString();
+ liveServerManager.createQueue(otherQueueName, otherQueueName);
+ backupServerManager.createQueue(otherQueueName, otherQueueName);
+ otherQueue = new JBossQueue(otherQueueName);
+
+ Connection connection = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ liveQueueControl = createQueueControl(queue.getQueueName(), liveMBeanServer);
+ backupQueueControl = createQueueControl(queue.getQueueName(), backupMBeanServer);
+ liveOtherQueueControl = createQueueControl(otherQueue.getQueueName(), liveMBeanServer);
+ backupOtherQueueControl = createQueueControl(otherQueue.getQueueName(), backupMBeanServer);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ session.close();
+
+ super.tearDown();
+ }
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSQueueControlTest.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -260,7 +260,6 @@
String expiryQueueName = randomString();
JBossQueue expiryQueue = new JBossQueue(expiryQueueName);
serverManager.createQueue(expiryQueueName, expiryQueueName);
- // FIXME we must be able to pass the queue name, not its address
queueControl.setExpiryAddress(expiryQueue.getAddress());
JMSQueueControlMBean expiryQueueControl = createQueueControl(expiryQueue);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSUtil.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -73,7 +73,7 @@
// Static --------------------------------------------------------
- static Connection createConnection(String connectorFactory) throws JMSException
+ public static Connection createConnection(String connectorFactory) throws JMSException
{
JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration(connectorFactory),
null,
@@ -142,7 +142,7 @@
return s.createConsumer(destination);
}
- static MessageConsumer createConsumer(Destination destination, boolean startConnection) throws JMSException
+ public static MessageConsumer createConsumer(Destination destination, boolean startConnection) throws JMSException
{
return createConsumer(destination,
startConnection,
@@ -184,7 +184,7 @@
return s.createDurableSubscriber(topic, subscriptionName);
}
- static void sendMessages(Destination destination, int messagesToSend) throws Exception
+ public static void sendMessages(Destination destination, int messagesToSend) throws Exception
{
JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
null,
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -313,7 +313,7 @@
replayMockedAttributes();
QueueControlMBean control = createControl();
- assertEquals(expiryQueueName, control.getExpiryQueue());
+ assertEquals(expiryQueueName, control.getExpiryAddress());
verifyMockedAttributes();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java 2008-12-09 16:36:28 UTC (rev 5489)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/server/management/impl/JMSQueueControlTest.java 2008-12-09 16:49:14 UTC (rev 5490)
@@ -231,16 +231,16 @@
verifyMockedAttributes();
}
- public void testGetDLQ() throws Exception
+ public void testGetDeadLetterAddress() throws Exception
{
- final String dlq = randomString();
+ final String deadLetterAddress = randomString();
QueueSettings settings = new QueueSettings()
{
@Override
public SimpleString getDeadLetterAddress()
{
- return new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + dlq);
+ return new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + deadLetterAddress);
}
};
expect(queueSettingsRepository.getMatch(name)).andReturn(settings);
@@ -248,12 +248,12 @@
replayMockedAttributes();
JMSQueueControl control = createControl();
- assertEquals(dlq, control.getDeadLetterAddress());
+ assertEquals(settings.getDeadLetterAddress().toString(), control.getDeadLetterAddress());
verifyMockedAttributes();
}
- public void testGetExpiryQueue() throws Exception
+ public void testGetExpiryAddress() throws Exception
{
final String expiryQueue = randomString();
@@ -271,7 +271,7 @@
replayMockedAttributes();
JMSQueueControl control = createControl();
- assertEquals(expiryQueue, control.getExpiryQueue());
+ assertEquals(settings.getExpiryAddress().toString(), control.getExpiryAddress());
verifyMockedAttributes();
}
More information about the jboss-cvs-commits
mailing list