[jboss-cvs] JBoss Messaging SVN: r4706 - in branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core: management/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 22 08:36:47 EDT 2008
Author: jmesnil
Date: 2008-07-22 08:36:47 -0400 (Tue, 22 Jul 2008)
New Revision: 4706
Modified:
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
Log:
JBMESSAGING-1303: Revisit management interfaces
* added removeMessage(long messageID) operation to QueueControlMBean
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/QueueControlMBean.java 2008-07-22 12:36:47 UTC (rev 4706)
@@ -60,7 +60,12 @@
TabularData listMessages(
@Parameter(name = "filter", desc = "A message filter") String filter)
throws Exception;
-
+
@Operation(desc = "Remove all the messages from the queue", impact = ACTION)
void removeAllMessages() throws Exception;
+
+ @Operation(desc = "Remove the message corresponding to the given messageID", impact = ACTION)
+ public boolean removeMessage(
+ @Parameter(name = "messageID", desc = "A message ID") long messageID)
+ throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/management/impl/QueueControl.java 2008-07-22 12:36:47 UTC (rev 4706)
@@ -156,6 +156,17 @@
}
}
+ public boolean removeMessage(long messageID) throws Exception
+ {
+ try
+ {
+ return queue.deleteReference(messageID, storageManager);
+ } catch (MessagingException e)
+ {
+ throw new IllegalStateException(e.getMessage());
+ }
+ }
+
// StandardMBean overrides ---------------------------------------
@Override
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/Queue.java 2008-07-22 12:36:47 UTC (rev 4706)
@@ -116,7 +116,10 @@
MessageReference getReference(long id);
void deleteAllReferences(StorageManager storageManager) throws Exception;
-
+
+ boolean deleteReference(long messageID, StorageManager storageManager)
+ throws Exception;
+
void lock();
void unlock();
Modified: branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-07-22 09:37:56 UTC (rev 4705)
+++ branches/Branch_JBMESSAGING-1303/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-07-22 12:36:47 UTC (rev 4706)
@@ -22,28 +22,37 @@
package org.jboss.messaging.core.server.impl;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.server.*;
+import org.jboss.messaging.core.server.Consumer;
+import org.jboss.messaging.core.server.DistributionPolicy;
+import org.jboss.messaging.core.server.HandleStatus;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.SimpleString;
-import java.util.*;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
/**
*
* Implementation of a Queue
@@ -482,6 +491,32 @@
tx.commit();
}
+ public synchronized boolean deleteReference(long messageID, final StorageManager storageManager) throws Exception
+ {
+ boolean deleted = false;
+
+ Transaction tx = new TransactionImpl(storageManager, null);
+
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == messageID)
+ {
+ deliveringCount.incrementAndGet();
+ tx.addAcknowledgement(ref);
+ iter.remove();
+ deleted = true;
+ break;
+ }
+ }
+
+ tx.commit();
+
+ return deleted;
+ }
+
public void lock()
{
lock.lock();
More information about the jboss-cvs-commits
mailing list