[jboss-cvs] JBoss Messaging SVN: r4750 - in branches/Branch_JBMESSAGING-1303: src/main/org/jboss/messaging/core/management/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 30 11:24:58 EDT 2008


Author: jmesnil
Date: 2008-07-30 11:24:58 -0400 (Wed, 30 Jul 2008)
New Revision: 4750

Modified:
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added operation sendMessageToDLQ(messageID) to QueueControlMBean
* fixed implementation of QueueControl.changePriority()

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-07-30 15:24:58 UTC (rev 4750)
@@ -62,6 +62,10 @@
    int getDeliveringCount();
 
    int getMessagesAdded();
+   
+   String getExpiryQueue();
+   
+   String getDLQ();
 
    // Operations ----------------------------------------------------
 
@@ -97,6 +101,11 @@
          @Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName)
          throws Exception;
 
+   @Operation(desc = "Send the message corresponding to the given messageID to this queue's Dead Letter Queue", impact = ACTION)
+   boolean sendMessageToDLQ(
+         @Parameter(name = "messageID", desc = "A message ID") long messageID)
+         throws Exception;
+   
    @Operation(desc = "Change the priority of the message corresponding to the given messageID", impact = ACTION)
    boolean changeMessagePriority(
          @Parameter(name = "messageID", desc = "A message ID") long messageID,

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-07-30 15:24:58 UTC (rev 4750)
@@ -147,7 +147,17 @@
    {
       return queue.getSizeBytes();
    }
+   
+   public String getDLQ()
+   {
+      return queueSettingsRepository.getMatch(getName()).getDLQ().toString();
+   }
 
+   public String getExpiryQueue()
+   {
+      return queueSettingsRepository.getMatch(getName()).getExpiryQueue().toString();
+   }
+
    public TabularData listAllMessages() throws Exception
    {
       return listMessages(null);
@@ -243,15 +253,17 @@
          throw new IllegalStateException(e.getMessage());
       }
    }
-   
-   public boolean moveMessage(long messageID, String otherQueueName) throws Exception
+
+   public boolean moveMessage(long messageID, String otherQueueName)
+         throws Exception
    {
       Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
       if (binding == null)
       {
-         throw new IllegalArgumentException("No queue found for " + otherQueueName);
+         throw new IllegalArgumentException("No queue found for "
+               + otherQueueName);
       }
-      
+
       List<MessageReference> refs = queue.list(null);
       for (MessageReference ref : refs)
       {
@@ -263,13 +275,30 @@
       }
       return false;
    }
-   
+
+   public boolean sendMessageToDLQ(long messageID) throws Exception
+   {
+      List<MessageReference> refs = queue.list(null);
+      for (MessageReference ref : refs)
+      {
+         ServerMessage message = ref.getMessage();
+         if (message.getMessageID() == messageID)
+         {
+            ref.sendToDLQ(storageManager, postOffice, queueSettingsRepository);
+            queue.removeReferenceWithID(messageID);
+            return true;
+         }
+      }
+      return false;
+   }
+
    public boolean changeMessagePriority(long messageID, int newPriority)
-   throws Exception
+         throws Exception
    {
       if (newPriority < 0 || newPriority > 9)
       {
-         throw new IllegalArgumentException("invalid newPriority value: " + newPriority + ". It must be between 0 and 9 (both included)");
+         throw new IllegalArgumentException("invalid newPriority value: "
+               + newPriority + ". It must be between 0 and 9 (both included)");
       }
       List<MessageReference> refs = queue.list(null);
       for (MessageReference ref : refs)
@@ -278,6 +307,10 @@
          if (message.getMessageID() == messageID)
          {
             message.setPriority((byte) newPriority);
+            // delete and add the reference so that it
+            // goes to the right queues for the new priority
+            queue.removeReferenceWithID(messageID);
+            queue.addLast(ref);
             return true;
          }
       }

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java	2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/MessageReference.java	2008-07-30 15:24:58 UTC (rev 4750)
@@ -66,6 +66,9 @@
    boolean cancel(StorageManager persistenceManager, PostOffice postOffice,
    		         HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;  
    
+   void sendToDLQ(StorageManager persistenceManager, PostOffice postOffice,
+                  HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+   
    void expire(StorageManager persistenceManager, PostOffice postOffice,
    		      HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
    

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-07-30 15:24:58 UTC (rev 4750)
@@ -131,7 +131,7 @@
       }
               
       queue.referenceCancelled();
-
+      
       int maxDeliveries = queueSettingsRepository.getMatch(queue.getName().toString()).getMaxDeliveryAttempts();
       
       if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
@@ -164,6 +164,8 @@
             tx.addAcknowledgement(this);   
          }       
          
+         // tx.commit();
+         
          return false;
       }
       else
@@ -172,6 +174,37 @@
       }
    }
    
+   public void sendToDLQ(StorageManager persistenceManager,
+         PostOffice postOffice,
+         HierarchicalRepository<QueueSettings> queueSettingsRepository)
+         throws Exception
+   {
+      SimpleString DLQ = queueSettingsRepository.getMatch(queue.getName().toString()).getDLQ();
+      
+      Transaction tx = new TransactionImpl(persistenceManager, postOffice);
+               
+      if (DLQ != null)
+      {
+         Binding binding = postOffice.getBinding(DLQ);
+         
+         if (binding == null)
+         {
+            binding = postOffice.addBinding(DLQ, DLQ, null, true, false);
+         }
+         
+         ServerMessage copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
+         
+         tx.addMessage(copyMessage);
+         
+         tx.addAcknowledgement(this);      
+         
+         tx.commit();
+      } else
+      {
+         throw new IllegalStateException("No DLQ configured for queue " + queue.getName());
+      }
+   }
+   
    public void expire(final StorageManager persistenceManager, final PostOffice postOffice,
    		final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {

Modified: branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-07-30 12:27:15 UTC (rev 4749)
+++ branches/Branch_JBMESSAGING-1303/tests/src/org/jboss/messaging/tests/unit/core/management/impl/QueueControlTest.java	2008-07-30 15:24:58 UTC (rev 4750)
@@ -51,6 +51,7 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.HandleStatus;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -340,7 +341,65 @@
 
       verify(queue, storageManager, postOffice, repository);
    }
+   
+   public void testGetDLQ() throws Exception
+   {
+      String queueName = randomString();
+      final String dlqName = randomString();
+      
+      Queue queue = createMock(Queue.class);
+      expect(queue.getName()).andReturn(new SimpleString(queueName));
+      StorageManager storageManager = createMock(StorageManager.class);
+      PostOffice postOffice = createMock(PostOffice.class);
+      HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+      QueueSettings queueSettings = new QueueSettings() 
+      {
+         @Override
+         public SimpleString getDLQ()
+         {
+            return new SimpleString(dlqName);
+         }   
+      };
+      expect(repository.getMatch(queueName)).andReturn(queueSettings);
+      
+      replay(queue, storageManager, postOffice, repository);
 
+      QueueControlMBean control = new QueueControl(queue, storageManager,
+            postOffice, repository);
+      assertEquals(dlqName, control.getDLQ());
+
+      verify(queue, storageManager, postOffice, repository);
+   }
+
+   public void testGetExpiryQueue() throws Exception
+   {
+      String queueName = randomString();
+      final String expiryQueueName = randomString();
+      
+      Queue queue = createMock(Queue.class);
+      expect(queue.getName()).andReturn(new SimpleString(queueName));
+      StorageManager storageManager = createMock(StorageManager.class);
+      PostOffice postOffice = createMock(PostOffice.class);
+      HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+      QueueSettings queueSettings = new QueueSettings() 
+      {
+         @Override
+         public SimpleString getExpiryQueue()
+         {
+            return new SimpleString(expiryQueueName);
+         }   
+      };
+      expect(repository.getMatch(queueName)).andReturn(queueSettings);
+      
+      replay(queue, storageManager, postOffice, repository);
+
+      QueueControlMBean control = new QueueControl(queue, storageManager,
+            postOffice, repository);
+      assertEquals(expiryQueueName, control.getExpiryQueue());
+
+      verify(queue, storageManager, postOffice, repository);
+   }
+   
    public void testRemoveAllMessages() throws Exception
    {
       Queue queue = createMock(Queue.class);
@@ -654,11 +713,13 @@
       expect(ref.getMessage()).andReturn(message);
       message.setPriority((byte) newPriority);
       refs.add(ref);
-      Queue queue = createMock(Queue.class);
-      expect(queue.list(null)).andReturn(refs);
       StorageManager storageManager = createMock(StorageManager.class);
       PostOffice postOffice = createMock(PostOffice.class);
       HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+      Queue queue = createMock(Queue.class);
+      expect(queue.list(null)).andReturn(refs);
+      expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+      expect(queue.addLast(ref)).andReturn(HandleStatus.HANDLED);
 
       replay(queue, storageManager, postOffice, repository, ref, message);
 
@@ -721,6 +782,52 @@
       verify(queue, storageManager, postOffice, repository);
    }
 
+   public void testSendMessageToDLQ() throws Exception
+   {
+      long messageID = randomLong();
+
+      List<MessageReference> refs = new ArrayList<MessageReference>();
+      MessageReference ref = createMock(MessageReference.class);
+      ServerMessage message = createMock(ServerMessage.class);
+      expect(message.getMessageID()).andStubReturn(messageID);
+      expect(ref.getMessage()).andReturn(message);
+      refs.add(ref);
+      Queue queue = createMock(Queue.class);
+      StorageManager storageManager = createMock(StorageManager.class);
+      expect(queue.list(null)).andReturn(refs);
+      PostOffice postOffice = createMock(PostOffice.class);
+      HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+      ref.sendToDLQ(storageManager, postOffice, repository);
+      expect(queue.removeReferenceWithID(messageID)).andReturn(true);
+
+      replay(queue, storageManager, postOffice, repository, ref, message);
+
+      QueueControlMBean control = new QueueControl(queue, storageManager,
+            postOffice, repository);
+      assertTrue(control.sendMessageToDLQ(messageID));
+
+      verify(queue, storageManager, postOffice, repository, ref, message);
+   }
+   
+   public void testSendMessageToDLQWithNoMessageID() throws Exception
+   {
+      long messageID = randomLong();
+
+      Queue queue = createMock(Queue.class);
+      StorageManager storageManager = createMock(StorageManager.class);
+      expect(queue.list(null)).andReturn(new ArrayList<MessageReference>());
+      PostOffice postOffice = createMock(PostOffice.class);
+      HierarchicalRepository<QueueSettings> repository = createMock(HierarchicalRepository.class);
+
+      replay(queue, storageManager, postOffice, repository);
+
+      QueueControlMBean control = new QueueControl(queue, storageManager,
+            postOffice, repository);
+      assertFalse(control.sendMessageToDLQ(messageID));
+
+      verify(queue, storageManager, postOffice, repository);
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list