[jboss-cvs] JBoss Messaging SVN: r4750 - in branches/Branch_JBMESSAGING-1303: src/main/org/jboss/messaging/core/management/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 30 11:24:58 EDT 2008
Author: jmesnil
Date: 2008-07-30 11:24:58 -0400 (Wed, 30 Jul 2008)
New Revision: 4750
Modified:
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added operation sendMessageToDLQ(messageID) to QueueControlMBean
* fixed implementation of QueueControl.changePriority()
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-07-30 15:24:58 UTC (rev 4750)
@@ -62,6 +62,10 @@
int getDeliveringCount();
int getMessagesAdded();
+
+ String getExpiryQueue();
+
+ String getDLQ();
// Operations ----------------------------------------------------
@@ -97,6 +101,11 @@
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName)
throws Exception;
+ @Operation(desc = "Send the message corresponding to the given messageID to this queue's Dead Letter Queue", impact = ACTION)
+ boolean sendMessageToDLQ(
+ @Parameter(name = "messageID", desc = "A message ID") long messageID)
+ throws Exception;
+
@Operation(desc = "Change the priority of the message corresponding to the given messageID", impact = ACTION)
boolean changeMessagePriority(
@Parameter(name = "messageID", desc = "A message ID") long messageID,
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-30 15:24:58 UTC (rev 4750)
@@ -147,7 +147,17 @@
{
return queue.getSizeBytes();
}
+
+ public String getDLQ()
+ {
+ return queueSettingsRepository.getMatch(getName()).getDLQ().toString();
+ }
+ public String getExpiryQueue()
+ {
+ return queueSettingsRepository.getMatch(getName()).getExpiryQueue().toString();
+ }
+
public TabularData listAllMessages() throws Exception
{
return listMessages(null);
@@ -243,15 +253,17 @@
throw new IllegalStateException(e.getMessage());
}
}
-
- public boolean moveMessage(long messageID, String otherQueueName) throws Exception
+
+ public boolean moveMessage(long messageID, String otherQueueName)
+ throws Exception
{
Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
if (binding == null)
{
- throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ throw new IllegalArgumentException("No queue found for "
+ + otherQueueName);
}
-
+
List<MessageReference> refs = queue.list(null);
for (MessageReference ref : refs)
{
@@ -263,13 +275,30 @@
}
return false;
}
-
+
+ public boolean sendMessageToDLQ(long messageID) throws Exception
+ {
+ List<MessageReference> refs = queue.list(null);
+ for (MessageReference ref : refs)
+ {
+ ServerMessage message = ref.getMessage();
+ if (message.getMessageID() == messageID)
+ {
+ ref.sendToDLQ(storageManager, postOffice, queueSettingsRepository);
+ queue.removeReferenceWithID(messageID);
+ return true;
+ }
+ }
+ return false;
+ }
+
public boolean changeMessagePriority(long messageID, int newPriority)
- throws Exception
+ throws Exception
{
if (newPriority < 0 || newPriority > 9)
{
- throw new IllegalArgumentException("invalid newPriority value: " + newPriority + ". It must be between 0 and 9 (both included)");
+ throw new IllegalArgumentException("invalid newPriority value: "
+ + newPriority + ". It must be between 0 and 9 (both included)");
}
List<MessageReference> refs = queue.list(null);
for (MessageReference ref : refs)
@@ -278,6 +307,10 @@
if (message.getMessageID() == messageID)
{
message.setPriority((byte) newPriority);
+ // delete and add the reference so that it
+ // goes to the right queues for the new priority
+ queue.removeReferenceWithID(messageID);
+ queue.addLast(ref);
return true;
}
}
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java 2008-07-30 15:24:58 UTC (rev 4750)
@@ -66,6 +66,9 @@
boolean cancel(StorageManager persistenceManager, PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+ void sendToDLQ(StorageManager persistenceManager, PostOffice postOffice,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+
void expire(StorageManager persistenceManager, PostOffice postOffice,
HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-07-30 15:24:58 UTC (rev 4750)
@@ -131,7 +131,7 @@
}
queue.referenceCancelled();
-
+
int maxDeliveries = queueSettingsRepository.getMatch(queue.getName().toString()).getMaxDeliveryAttempts();
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
@@ -164,6 +164,8 @@
tx.addAcknowledgement(this);
}
+ // tx.commit();
+
return false;
}
else
@@ -172,6 +174,37 @@
}
}
+ public void sendToDLQ(StorageManager persistenceManager,
+ PostOffice postOffice,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ throws Exception
+ {
+ SimpleString DLQ = queueSettingsRepository.getMatch(queue.getName().toString()).getDLQ();
+
+ Transaction tx = new TransactionImpl(persistenceManager, postOffice);
+
+ if (DLQ != null)
+ {
+ Binding binding = postOffice.getBinding(DLQ);
+
+ if (binding == null)
+ {
+ binding = postOffice.addBinding(DLQ, DLQ, null, true, false);
+ }
+
+ ServerMessage copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
+
+ tx.addMessage(copyMessage);
+
+ tx.addAcknowledgement(this);
+
+ tx.commit();
+ } else
+ {
+ throw new IllegalStateException("No DLQ configured for queue " + queue.getName());
+ }
+ }
+
public void expire(final StorageManager persistenceManager, final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java 2008-07-30 15:24:58 UTC (rev 4750)
@@ -51,6 +51,7 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
@@ -340,7 +341,65 @@
verify(queue, storageManager, postOffice, repository);
}
+
+ public void testGetDLQ() throws Exception
+ {
+ String queueName = randomString();
+ final String dlqName = randomString();
+
+ Queue queue = createMock(Queue.class);
+ expect(queue.getName()).andReturn(new SimpleString(queueName));
+ StorageManager storageManager = createMock(StorageManager.class);
+ PostOffice postOffice = createMock(PostOffice.class);
+ HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+ QueueSettings queueSettings = new QueueSettings()
+ {
+ @Override
+ public SimpleString getDLQ()
+ {
+ return new SimpleString(dlqName);
+ }
+ };
+ expect(repository.getMatch(queueName)).andReturn(queueSettings);
+
+ replay(queue, storageManager, postOffice, repository);
+ QueueControlMBean control = new QueueControl(queue, storageManager,
+ postOffice, repository);
+ assertEquals(dlqName, control.getDLQ());
+
+ verify(queue, storageManager, postOffice, repository);
+ }
+
+ public void testGetExpiryQueue() throws Exception
+ {
+ String queueName = randomString();
+ final String expiryQueueName = randomString();
+
+ Queue queue = createMock(Queue.class);
+ expect(queue.getName()).andReturn(new SimpleString(queueName));
+ StorageManager storageManager = createMock(StorageManager.class);
+ PostOffice postOffice = createMock(PostOffice.class);
+ HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+ QueueSettings queueSettings = new QueueSettings()
+ {
+ @Override
+ public SimpleString getExpiryQueue()
+ {
+ return new SimpleString(expiryQueueName);
+ }
+ };
+ expect(repository.getMatch(queueName)).andReturn(queueSettings);
+
+ replay(queue, storageManager, postOffice, repository);
+
+ QueueControlMBean control = new QueueControl(queue, storageManager,
+ postOffice, repository);
+ assertEquals(expiryQueueName, control.getExpiryQueue());
+
+ verify(queue, storageManager, postOffice, repository);
+ }
+
public void testRemoveAllMessages() throws Exception
{
Queue queue = createMock(Queue.class);
@@ -654,11 +713,13 @@
expect(ref.getMessage()).andReturn(message);
message.setPriority((byte) newPriority);
refs.add(ref);
- Queue queue = createMock(Queue.class);
- expect(queue.list(null)).andReturn(refs);
StorageManager storageManager = createMock(StorageManager.class);
PostOffice postOffice = createMock(PostOffice.class);
HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+ Queue queue = createMock(Queue.class);
+ expect(queue.list(null)).andReturn(refs);
+ expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+ expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
replay(queue, storageManager, postOffice, repository, ref, message);
@@ -721,6 +782,52 @@
verify(queue, storageManager, postOffice, repository);
}
+ public void testSendMessageToDLQ() throws Exception
+ {
+ long messageID = randomLong();
+
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+ MessageReference ref = createMock(MessageReference.class);
+ ServerMessage message = createMock(ServerMessage.class);
+ expect(message.getMessageID()).andStubReturn(messageID);
+ expect(ref.getMessage()).andReturn(message);
+ refs.add(ref);
+ Queue queue = createMock(Queue.class);
+ StorageManager storageManager = createMock(StorageManager.class);
+ expect(queue.list(null)).andReturn(refs);
+ PostOffice postOffice = createMock(PostOffice.class);
+ HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+ ref.sendToDLQ(storageManager, postOffice, repository);
+ expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+
+ replay(queue, storageManager, postOffice, repository, ref, message);
+
+ QueueControlMBean control = new QueueControl(queue, storageManager,
+ postOffice, repository);
+ assertTrue(control.sendMessageToDLQ(messageID));
+
+ verify(queue, storageManager, postOffice, repository, ref, message);
+ }
+
+ public void testSendMessageToDLQWithNoMessageID() throws Exception
+ {
+ long messageID = randomLong();
+
+ Queue queue = createMock(Queue.class);
+ StorageManager storageManager = createMock(StorageManager.class);
+ expect(queue.list(null)).andReturn(new ArrayList<MessageReference>());
+ PostOffice postOffice = createMock(PostOffice.class);
+ HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+
+ replay(queue, storageManager, postOffice, repository);
+
+ QueueControlMBean control = new QueueControl(queue, storageManager,
+ postOffice, repository);
+ assertFalse(control.sendMessageToDLQ(messageID));
+
+ verify(queue, storageManager, postOffice, repository);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list