[jboss-cvs] JBossAS SVN: r57131 - in trunk/messaging/src/main/org/jboss/mq/server: . jmx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Sep 25 06:11:50 EDT 2006
Author: adrian at jboss.org
Date: 2006-09-25 06:11:43 -0400 (Mon, 25 Sep 2006)
New Revision: 57131
Modified:
trunk/messaging/src/main/org/jboss/mq/server/BasicQueue.java
trunk/messaging/src/main/org/jboss/mq/server/jmx/Queue.java
trunk/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java
trunk/messaging/src/main/org/jboss/mq/server/jmx/Topic.java
trunk/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java
Log:
Port scheduled and inprocess message operations from jboss4.
Modified: trunk/messaging/src/main/org/jboss/mq/server/BasicQueue.java
===================================================================
--- trunk/messaging/src/main/org/jboss/mq/server/BasicQueue.java 2006-09-25 09:58:42 UTC (rev 57130)
+++ trunk/messaging/src/main/org/jboss/mq/server/BasicQueue.java 2006-09-25 10:11:43 UTC (rev 57131)
@@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@@ -38,8 +39,8 @@
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.DestinationFullException;
+import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyJMSException;
-import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.pm.Tx;
@@ -50,7 +51,7 @@
import org.jboss.util.timeout.TimeoutTarget;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
/**
* This class represents a queue which provides it's messages exclusively to one
@@ -83,8 +84,8 @@
/** Events by message id */
ConcurrentHashMap events = new ConcurrentHashMap();
- /** Estimate of number of tasks on the timer (for the size estimate) */
- SynchronizedInt scheduledMessageCount = new SynchronizedInt(0);
+ /** The scheduled messages */
+ CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet();
/** The JMSServer object */
JMSDestinationManager server;
@@ -326,10 +327,23 @@
*/
public int getScheduledMessageCount()
{
- return scheduledMessageCount.get();
+ return scheduledMessages.size();
}
/**
+ * Returns the number of in process messages for the queue
+ *
+ * @return the in process count
+ */
+ public int getInProcessMessageCount()
+ {
+ synchronized (messages)
+ {
+ return unacknowledgedMessages.size();
+ }
+ }
+
+ /**
* Add a message to the queue
*
* @param mes the message reference
@@ -520,6 +534,98 @@
}
/**
+ * Browse the scheduled messages
+ *
+ * @param selector the selector to apply, pass null for
+ * all messages
+ * @return the messages
+ * @throws JMSException for any error
+ */
+ public List browseScheduled(String selector) throws JMSException
+ {
+ if (selector == null)
+ {
+ ArrayList list;
+ synchronized (messages)
+ {
+ list = new ArrayList(scheduledMessages.size());
+ Iterator iter = scheduledMessages.iterator();
+ while (iter.hasNext())
+ {
+ MessageReference ref = (MessageReference) iter.next();
+ list.add(ref.getMessageForDelivery());
+ }
+ }
+ return list;
+ }
+ else
+ {
+ Selector s = new Selector(selector);
+ LinkedList selection = new LinkedList();
+
+ synchronized (messages)
+ {
+ Iterator iter = scheduledMessages.iterator();
+ while (iter.hasNext())
+ {
+ MessageReference ref = (MessageReference) iter.next();
+ if (s.test(ref.getHeaders()))
+ selection.add(ref.getMessageForDelivery());
+ }
+ }
+
+ return selection;
+ }
+ }
+
+ /**
+ * Browse the in process messages
+ *
+ * @param selector the selector to apply, pass null for
+ * all messages
+ * @return the messages
+ * @throws JMSException for any error
+ */
+ public List browseInProcess(String selector) throws JMSException
+ {
+ if (selector == null)
+ {
+ ArrayList list;
+ synchronized (messages)
+ {
+ list = new ArrayList(unacknowledgedMessages.size());
+ Iterator iter = unacknowledgedMessages.values().iterator();
+ while (iter.hasNext())
+ {
+ UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
+ MessageReference ref = unacked.messageRef;
+ list.add(ref.getMessageForDelivery());
+ }
+ }
+ return list;
+ }
+ else
+ {
+ Selector s = new Selector(selector);
+ LinkedList selection = new LinkedList();
+
+ synchronized (messages)
+ {
+ Iterator iter = unacknowledgedMessages.values().iterator();
+ while (iter.hasNext())
+ {
+ UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
+ MessageReference ref = unacked.messageRef;
+ if (s.test(ref.getHeaders()))
+ selection.add(ref.getMessageForDelivery());
+ }
+ }
+
+ return selection;
+ }
+ }
+
+ /**
* Receive a message from the queue
*
* @param sub the subscription requiring a message
@@ -744,7 +850,7 @@
dropMessage(message);
}
}
- scheduledMessageCount.set(0);
+ scheduledMessages.clear();
synchronized (receivers)
{
@@ -866,7 +972,6 @@
*/
protected void clearEvents()
{
- scheduledMessageCount.set(0);
for (Iterator i = events.entrySet().iterator(); i.hasNext();)
{
Map.Entry entry = (Map.Entry) i.next();
@@ -877,6 +982,7 @@
i.remove();
}
}
+ scheduledMessages.clear();
}
/**
@@ -889,6 +995,7 @@
Timeout timeout = (Timeout) events.remove(message);
if (timeout != null)
timeout.cancel();
+ scheduledMessages.remove(message);
}
/**
@@ -948,8 +1055,8 @@
long ts = message.messageScheduledDelivery;
if (ts > 0 && ts > System.currentTimeMillis())
{
+ scheduledMessages.add(message);
addTimeout(message, new EnqueueMessageTask(message), ts);
- scheduledMessageCount.increment();
if (trace)
log.trace("scheduled message at " + new Date(ts) + ": " + message);
// Can't deliver now
@@ -1319,8 +1426,8 @@
if (log.isTraceEnabled())
log.trace("scheduled message delivery: " + messageRef);
events.remove(messageRef);
+ scheduledMessages.remove(messageRef);
internalAddMessage(messageRef);
- scheduledMessageCount.decrement();
}
}
@@ -1339,6 +1446,7 @@
public void timedOut(Timeout timout)
{
events.remove(messageRef);
+ scheduledMessages.remove(messageRef);
synchronized (messages)
{
// If the message was already sent, then do nothing
Modified: trunk/messaging/src/main/org/jboss/mq/server/jmx/Queue.java
===================================================================
--- trunk/messaging/src/main/org/jboss/mq/server/jmx/Queue.java 2006-09-25 09:58:42 UTC (rev 57130)
+++ trunk/messaging/src/main/org/jboss/mq/server/jmx/Queue.java 2006-09-25 10:11:43 UTC (rev 57131)
@@ -66,6 +66,13 @@
return destination.queue.getScheduledMessageCount();
}
+ public int getInProcessMessageCount() throws Exception
+ {
+ if (destination == null)
+ return 0;
+ return destination.queue.getInProcessMessageCount();
+ }
+
public void startService() throws Exception
{
if (destinationName == null || destinationName.length() == 0)
@@ -131,6 +138,34 @@
return null;
return Arrays.asList(destination.queue.browse(selector));
}
+
+ public List listScheduledMessages() throws Exception
+ {
+ if (destination == null)
+ return null;
+ return destination.queue.browseScheduled(null);
+ }
+
+ public List listScheduledMessages(String selector) throws Exception
+ {
+ if (destination == null)
+ return null;
+ return destination.queue.browseScheduled(selector);
+ }
+
+ public List listInProcessMessages() throws Exception
+ {
+ if (destination == null)
+ return null;
+ return destination.queue.browseInProcess(null);
+ }
+
+ public List listInProcessMessages(String selector) throws Exception
+ {
+ if (destination == null)
+ return null;
+ return destination.queue.browseInProcess(selector);
+ }
public MessageCounter[] getMessageCounter()
{
Modified: trunk/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java
===================================================================
--- trunk/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java 2006-09-25 09:58:42 UTC (rev 57130)
+++ trunk/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java 2006-09-25 10:11:43 UTC (rev 57131)
@@ -53,6 +53,14 @@
int getScheduledMessageCount() throws Exception;
/**
+ * Gets the InprocessMessageCount attribute of the BasicQueue object
+ *
+ * @return The ScheduledMessageCount value
+ * @exception Exception Description of Exception
+ */
+ int getInProcessMessageCount() throws Exception;
+
+ /**
* Get the number of active receivers
*
* @return the number of receivers
@@ -81,9 +89,43 @@
* @return the messages
* @throws Exception for any error
*/
- List listMessages(java.lang.String selector) throws Exception;
+ List listMessages(String selector) throws Exception;
/**
+ * List the scheduled messages
+ *
+ * @return the messages
+ * @throws Exception for any error
+ */
+ List listScheduledMessages() throws Exception;
+
+ /**
+ * List the scheduled messages matching a selector
+ *
+ * @param selector the selector
+ * @return the messages
+ * @throws Exception for any error
+ */
+ List listScheduledMessages(String selector) throws Exception;
+
+ /**
+ * List the in process messages
+ *
+ * @return the messages
+ * @throws Exception for any error
+ */
+ List listInProcessMessages() throws Exception;
+
+ /**
+ * List the in process messages matching a selector
+ *
+ * @param selector the selector
+ * @return the messages
+ * @throws Exception for any error
+ */
+ List listInProcessMessages(String selector) throws Exception;
+
+ /**
* Get the number of active subscribers
*
* @return the number of subscribers
Modified: trunk/messaging/src/main/org/jboss/mq/server/jmx/Topic.java
===================================================================
--- trunk/messaging/src/main/org/jboss/mq/server/jmx/Topic.java 2006-09-25 09:58:42 UTC (rev 57130)
+++ trunk/messaging/src/main/org/jboss/mq/server/jmx/Topic.java 2006-09-25 10:11:43 UTC (rev 57131)
@@ -238,10 +238,119 @@
BasicQueue queue = findDurableBasicQueue(id, name);
return Arrays.asList(queue.browse(selector));
}
+
+ public long getNonDurableMessageCount(String id, String sub) throws Exception
+ {
+ if (destination == null)
+ return 0;
+ BasicQueue queue = findNonDurableBasicQueue(id, sub);
+ return queue.getQueueDepth();
+ }
+
+ public long getDurableMessageCount(String id, String name) throws Exception
+ {
+ if (destination == null)
+ return 0;
+ BasicQueue queue = findDurableBasicQueue(id, name);
+ return queue.getQueueDepth();
+ }
+
+ public List listNonDurableScheduledMessages(String id, String sub) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findNonDurableBasicQueue(id, sub);
+ return queue.browseScheduled(null);
+ }
+
+ public List listNonDurableScheduledMessages(String id, String sub, String selector) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findNonDurableBasicQueue(id, sub);
+ return queue.browseScheduled(selector);
+ }
+
+ public List listDurableScheduledMessages(String id, String name) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findDurableBasicQueue(id, name);
+ return queue.browseScheduled(null);
+ }
+
+ public List listDurableScheduledMessages(String id, String name, String selector) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findDurableBasicQueue(id, name);
+ return queue.browseScheduled(selector);
+ }
+
+ public long getNonDurableScheduledMessageCount(String id, String sub) throws Exception
+ {
+ if (destination == null)
+ return 0;
+ BasicQueue queue = findNonDurableBasicQueue(id, sub);
+ return queue.getScheduledMessageCount();
+ }
+
+ public long getDurableScheduledMessageCount(String id, String name) throws Exception
+ {
+ if (destination == null)
+ return 0;
+ BasicQueue queue = findDurableBasicQueue(id, name);
+ return queue.getScheduledMessageCount();
+ }
+
+ public List listNonDurableInProcessMessages(String id, String sub) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findNonDurableBasicQueue(id, sub);
+ return queue.browseInProcess(null);
+ }
+
+ public List listNonDurableInProcessMessages(String id, String sub, String selector) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findNonDurableBasicQueue(id, sub);
+ return queue.browseInProcess(selector);
+ }
+
+ public List listDurableInProcessMessages(String id, String name) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findDurableBasicQueue(id, name);
+ return queue.browseInProcess(null);
+ }
+
+ public List listDurableInProcessMessages(String id, String name, String selector) throws Exception
+ {
+ if (destination == null)
+ return null;
+ BasicQueue queue = findDurableBasicQueue(id, name);
+ return queue.browseInProcess(selector);
+ }
+
+ public long getNonDurableInProcessMessageCount(String id, String sub) throws Exception
+ {
+ if (destination == null)
+ return 0;
+ BasicQueue queue = findNonDurableBasicQueue(id, sub);
+ return queue.getInProcessMessageCount();
+ }
+
+ public long getDurableInProcessMessageCount(String id, String name) throws Exception
+ {
+ if (destination == null)
+ return 0;
+ BasicQueue queue = findDurableBasicQueue(id, name);
+ return queue.getInProcessMessageCount();
+ }
- /**
- * @see DestinationMBeanSupport#getMessageCounter()
- */
public MessageCounter[] getMessageCounter()
{
if (destination == null)
Modified: trunk/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java
===================================================================
--- trunk/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java 2006-09-25 09:58:42 UTC (rev 57130)
+++ trunk/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java 2006-09-25 10:11:43 UTC (rev 57131)
@@ -60,4 +60,32 @@
List listDurableMessages(String id, String name) throws Exception;
List listDurableMessages(String id, String name, String selector) throws Exception;
+
+ long getNonDurableMessageCount(String id, String sub) throws Exception;
+
+ long getDurableMessageCount(String id, String name) throws Exception;
+
+ List listNonDurableScheduledMessages(String id, String sub) throws Exception;
+
+ List listNonDurableScheduledMessages(String id, String sub, String selector) throws Exception;
+
+ List listDurableScheduledMessages(String id, String name) throws Exception;
+
+ List listDurableScheduledMessages(String id, String name, String selector) throws Exception;
+
+ long getNonDurableScheduledMessageCount(String id, String sub) throws Exception;
+
+ long getDurableScheduledMessageCount(String id, String name) throws Exception;
+
+ List listNonDurableInProcessMessages(String id, String sub) throws Exception;
+
+ List listNonDurableInProcessMessages(String id, String sub, String selector) throws Exception;
+
+ List listDurableInProcessMessages(String id, String name) throws Exception;
+
+ List listDurableInProcessMessages(String id, String name, String selector) throws Exception;
+
+ long getNonDurableInProcessMessageCount(String id, String sub) throws Exception;
+
+ long getDurableInProcessMessageCount(String id, String name) throws Exception;
}
More information about the jboss-cvs-commits
mailing list