[jboss-cvs] JBoss Messaging SVN: r4706 - in branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core: management/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 22 08:36:47 EDT 2008


Author: jmesnil
Date: 2008-07-22 08:36:47 -0400 (Tue, 22 Jul 2008)
New Revision: 4706

Modified:
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
   branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added removeMessage(long messageID) operation to QueueControlMBean


Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java	2008-07-22 12:36:47 UTC (rev 4706)
@@ -60,7 +60,12 @@
    TabularData listMessages(
          @Parameter(name = "filter", desc = "A message filter") String filter)
          throws Exception;
-   
+
    @Operation(desc = "Remove all the messages from the queue", impact = ACTION)
    void removeAllMessages() throws Exception;
+
+   @Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
+   public boolean removeMessage(
+         @Parameter(name = "messageID", desc = "A message ID") long messageID)
+         throws Exception;
 }

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java	2008-07-22 12:36:47 UTC (rev 4706)
@@ -156,6 +156,17 @@
       }
    }
 
+   public boolean removeMessage(long messageID) throws Exception
+   {
+      try
+      {
+         return queue.deleteReference(messageID, storageManager);
+      } catch (MessagingException e)
+      {
+         throw new IllegalStateException(e.getMessage());
+      }
+   }
+
    // StandardMBean overrides ---------------------------------------
 
    @Override

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java	2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java	2008-07-22 12:36:47 UTC (rev 4706)
@@ -116,7 +116,10 @@
    MessageReference getReference(long id);
    
    void deleteAllReferences(StorageManager storageManager) throws Exception;
-   
+
+   boolean deleteReference(long messageID, StorageManager storageManager)
+   throws Exception;
+
    void lock();
    
    void unlock();

Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2008-07-22 12:36:47 UTC (rev 4706)
@@ -22,28 +22,37 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.list.PriorityLinkedList;
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
 import org.jboss.messaging.util.SimpleString;
 
-import java.util.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 /**
  * 
  * Implementation of a Queue
@@ -482,6 +491,32 @@
       tx.commit();
    }
    
+   public synchronized boolean deleteReference(long messageID, final StorageManager storageManager) throws Exception
+   {
+      boolean deleted = false;
+      
+      Transaction tx = new TransactionImpl(storageManager, null);
+
+      Iterator<MessageReference> iter = messageReferences.iterator();
+
+      while (iter.hasNext())
+      {
+         MessageReference ref = iter.next();
+         if (ref.getMessage().getMessageID() == messageID)
+         {         
+            deliveringCount.incrementAndGet();
+            tx.addAcknowledgement(ref);
+            iter.remove();
+            deleted = true;
+            break;
+         }
+      }
+
+      tx.commit();
+      
+      return deleted;
+   }
+   
    public void lock()
    {
       lock.lock();




More information about the jboss-cvs-commits mailing list