[jboss-cvs] JBossAS SVN: r57130 - in branches/Branch_4_0/messaging/src/main/org/jboss/mq/server: . jmx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Sep 25 05:58:49 EDT 2006


Author: adrian at jboss.org
Date: 2006-09-25 05:58:42 -0400 (Mon, 25 Sep 2006)
New Revision: 57130

Modified:
   branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/BasicQueue.java
   branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Queue.java
   branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java
   branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Topic.java
   branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java
Log:
[JBAS-3228] and [JBAS-3386] - Stats and browsing for scheduled
and in process (unacknowledged) messages.

Modified: branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/BasicQueue.java
===================================================================
--- branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/BasicQueue.java	2006-09-25 09:48:29 UTC (rev 57129)
+++ branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/BasicQueue.java	2006-09-25 09:58:42 UTC (rev 57130)
@@ -27,6 +27,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -38,8 +39,8 @@
 import org.jboss.logging.Logger;
 import org.jboss.mq.AcknowledgementRequest;
 import org.jboss.mq.DestinationFullException;
+import org.jboss.mq.SpyDestination;
 import org.jboss.mq.SpyJMSException;
-import org.jboss.mq.SpyDestination;
 import org.jboss.mq.SpyMessage;
 import org.jboss.mq.Subscription;
 import org.jboss.mq.pm.Tx;
@@ -50,7 +51,7 @@
 import org.jboss.util.timeout.TimeoutTarget;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
+import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
 
 /**
  *  This class represents a queue which provides it's messages exclusively to one
@@ -83,8 +84,8 @@
    /** Events by message id */
    ConcurrentHashMap events = new ConcurrentHashMap();
    
-   /** Estimate of number of tasks on the timer (for the size estimate) */
-   SynchronizedInt scheduledMessageCount = new SynchronizedInt(0);
+   /** The scheduled messages */
+   CopyOnWriteArraySet scheduledMessages = new CopyOnWriteArraySet();
 
    /** The JMSServer object */
    JMSDestinationManager server;
@@ -326,10 +327,23 @@
     */
    public int getScheduledMessageCount()
    {
-      return scheduledMessageCount.get();
+      return scheduledMessages.size();
    }
 
    /**
+    * Returns the number of in process messages for the queue
+    * 
+    * @return the in process count
+    */
+   public int getInProcessMessageCount()
+   {
+      synchronized (messages)
+      {
+         return unacknowledgedMessages.size();
+      }
+   }
+
+   /**
     * Add a message to the queue
     *
     * @param mes the message reference
@@ -520,6 +534,98 @@
    }
 
    /**
+    * Browse the scheduled messages
+    *
+    * @param selector the selector to apply, pass null for
+    *                 all messages
+    * @return the messages
+    * @throws JMSException for any error
+    */
+   public List browseScheduled(String selector) throws JMSException
+   {
+      if (selector == null)
+      {
+         ArrayList list;
+         synchronized (messages)
+         {
+            list = new ArrayList(scheduledMessages.size());
+            Iterator iter = scheduledMessages.iterator();
+            while (iter.hasNext())
+            {
+               MessageReference ref = (MessageReference) iter.next();
+               list.add(ref.getMessageForDelivery());
+            }
+         }
+         return list;
+      }
+      else
+      {
+         Selector s = new Selector(selector);
+         LinkedList selection = new LinkedList();
+
+         synchronized (messages)
+         {
+            Iterator iter = scheduledMessages.iterator();
+            while (iter.hasNext())
+            {
+               MessageReference ref = (MessageReference) iter.next();
+               if (s.test(ref.getHeaders()))
+                  selection.add(ref.getMessageForDelivery());
+            }
+         }
+         
+         return selection;
+      }
+   }
+
+   /**
+    * Browse the in process messages
+    *
+    * @param selector the selector to apply, pass null for
+    *                 all messages
+    * @return the messages
+    * @throws JMSException for any error
+    */
+   public List browseInProcess(String selector) throws JMSException
+   {
+      if (selector == null)
+      {
+         ArrayList list;
+         synchronized (messages)
+         {
+            list = new ArrayList(unacknowledgedMessages.size());
+            Iterator iter = unacknowledgedMessages.values().iterator();
+            while (iter.hasNext())
+            {
+               UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
+               MessageReference ref = unacked.messageRef;
+               list.add(ref.getMessageForDelivery());
+            }
+         }
+         return list;
+      }
+      else
+      {
+         Selector s = new Selector(selector);
+         LinkedList selection = new LinkedList();
+
+         synchronized (messages)
+         {
+            Iterator iter = unacknowledgedMessages.values().iterator();
+            while (iter.hasNext())
+            {
+               UnackedMessageInfo unacked = (UnackedMessageInfo) iter.next();
+               MessageReference ref = unacked.messageRef;
+               if (s.test(ref.getHeaders()))
+                  selection.add(ref.getMessageForDelivery());
+            }
+         }
+         
+         return selection;
+      }
+   }
+
+   /**
     * Receive a message from the queue
     *
     * @param sub the subscription requiring a message
@@ -744,7 +850,7 @@
             dropMessage(message);
          }
       }
-      scheduledMessageCount.set(0);
+      scheduledMessages.clear();
 
       synchronized (receivers)
       {
@@ -866,7 +972,6 @@
     */
    protected void clearEvents()
    {
-      scheduledMessageCount.set(0);
       for (Iterator i = events.entrySet().iterator(); i.hasNext();)
       {
          Map.Entry entry = (Map.Entry) i.next();
@@ -877,6 +982,7 @@
             i.remove();
          }
       }
+      scheduledMessages.clear();
    }
    
    /**
@@ -889,6 +995,7 @@
       Timeout timeout = (Timeout) events.remove(message);
       if (timeout != null)
          timeout.cancel();
+      scheduledMessages.remove(message);
    }
    
    /**
@@ -948,8 +1055,8 @@
       long ts = message.messageScheduledDelivery;
       if (ts > 0 && ts > System.currentTimeMillis())
       {
+         scheduledMessages.add(message);
          addTimeout(message, new EnqueueMessageTask(message), ts);
-         scheduledMessageCount.increment();
          if (trace)
             log.trace("scheduled message at " + new Date(ts) + ": " + message);
          // Can't deliver now
@@ -1319,8 +1426,8 @@
          if (log.isTraceEnabled())
             log.trace("scheduled message delivery: " + messageRef);
          events.remove(messageRef);
+         scheduledMessages.remove(messageRef);
          internalAddMessage(messageRef);
-         scheduledMessageCount.decrement();
       }
    }
 
@@ -1339,6 +1446,7 @@
       public void timedOut(Timeout timout)
       {
          events.remove(messageRef);
+         scheduledMessages.remove(messageRef);
          synchronized (messages)
          {
             // If the message was already sent, then do nothing

Modified: branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Queue.java
===================================================================
--- branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Queue.java	2006-09-25 09:48:29 UTC (rev 57129)
+++ branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Queue.java	2006-09-25 09:58:42 UTC (rev 57130)
@@ -66,6 +66,13 @@
       return destination.queue.getScheduledMessageCount();
    }
 
+   public int getInProcessMessageCount() throws Exception
+   {
+      if (destination == null)
+         return 0;
+      return destination.queue.getInProcessMessageCount();
+   }
+
    public void startService() throws Exception
    {
       if (destinationName == null || destinationName.length() == 0)
@@ -131,6 +138,34 @@
          return null;
       return Arrays.asList(destination.queue.browse(selector));
    }
+
+   public List listScheduledMessages() throws Exception
+   {
+      if (destination == null)
+         return null;
+      return destination.queue.browseScheduled(null);
+   }
+
+   public List listScheduledMessages(String selector) throws Exception
+   {
+      if (destination == null)
+         return null;
+      return destination.queue.browseScheduled(selector);
+   }
+
+   public List listInProcessMessages() throws Exception
+   {
+      if (destination == null)
+         return null;
+      return destination.queue.browseInProcess(null);
+   }
+
+   public List listInProcessMessages(String selector) throws Exception
+   {
+      if (destination == null)
+         return null;
+      return destination.queue.browseInProcess(selector);
+   }
    
    public MessageCounter[] getMessageCounter()
    {

Modified: branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java
===================================================================
--- branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java	2006-09-25 09:48:29 UTC (rev 57129)
+++ branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/QueueMBean.java	2006-09-25 09:58:42 UTC (rev 57130)
@@ -53,6 +53,14 @@
    int getScheduledMessageCount() throws Exception;
 
    /**
+    * Gets the InprocessMessageCount attribute of the BasicQueue object
+    * 
+    * @return The ScheduledMessageCount value
+    * @exception Exception Description of Exception    
+    */
+   int getInProcessMessageCount() throws Exception;
+
+   /**
     * Get the number of active receivers
     * 
     * @return the number of receivers
@@ -81,9 +89,43 @@
     * @return the messages
     * @throws Exception for any error
     */
-   List listMessages(java.lang.String selector) throws Exception;
+   List listMessages(String selector) throws Exception;
 
    /**
+    * List the scheduled messages
+    * 
+    * @return the messages
+    * @throws Exception for any error
+    */
+   List listScheduledMessages() throws Exception;
+
+   /**
+    * List the scheduled messages matching a selector
+    * 
+    * @param selector the selector
+    * @return the messages
+    * @throws Exception for any error
+    */
+   List listScheduledMessages(String selector) throws Exception;
+
+   /**
+    * List the in process messages
+    * 
+    * @return the messages
+    * @throws Exception for any error
+    */
+   List listInProcessMessages() throws Exception;
+
+   /**
+    * List the in process messages matching a selector
+    * 
+    * @param selector the selector
+    * @return the messages
+    * @throws Exception for any error
+    */
+   List listInProcessMessages(String selector) throws Exception;
+
+   /**
     * Get the number of active subscribers
     * 
     * @return the number of subscribers

Modified: branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Topic.java
===================================================================
--- branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Topic.java	2006-09-25 09:48:29 UTC (rev 57129)
+++ branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/Topic.java	2006-09-25 09:58:42 UTC (rev 57130)
@@ -238,10 +238,119 @@
       BasicQueue queue = findDurableBasicQueue(id, name);
       return Arrays.asList(queue.browse(selector));
    }
+
+   public long getNonDurableMessageCount(String id, String sub) throws Exception
+   {
+      if (destination == null)
+         return 0;
+      BasicQueue queue = findNonDurableBasicQueue(id, sub);
+      return queue.getQueueDepth();
+   }
+
+   public long getDurableMessageCount(String id, String name) throws Exception
+   {
+      if (destination == null)
+         return 0;
+      BasicQueue queue = findDurableBasicQueue(id, name);
+      return queue.getQueueDepth();
+   }
+
+   public List listNonDurableScheduledMessages(String id, String sub) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findNonDurableBasicQueue(id, sub);
+      return queue.browseScheduled(null);
+   }
+
+   public List listNonDurableScheduledMessages(String id, String sub, String selector) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findNonDurableBasicQueue(id, sub);
+      return queue.browseScheduled(selector);
+   }
+
+   public List listDurableScheduledMessages(String id, String name) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findDurableBasicQueue(id, name);
+      return queue.browseScheduled(null);
+   }
+
+   public List listDurableScheduledMessages(String id, String name, String selector) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findDurableBasicQueue(id, name);
+      return queue.browseScheduled(selector);
+   }
+
+   public long getNonDurableScheduledMessageCount(String id, String sub) throws Exception
+   {
+      if (destination == null)
+         return 0;
+      BasicQueue queue = findNonDurableBasicQueue(id, sub);
+      return queue.getScheduledMessageCount();
+   }
+
+   public long getDurableScheduledMessageCount(String id, String name) throws Exception
+   {
+      if (destination == null)
+         return 0;
+      BasicQueue queue = findDurableBasicQueue(id, name);
+      return queue.getScheduledMessageCount();
+   }
+
+   public List listNonDurableInProcessMessages(String id, String sub) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findNonDurableBasicQueue(id, sub);
+      return queue.browseInProcess(null);
+   }
+
+   public List listNonDurableInProcessMessages(String id, String sub, String selector) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findNonDurableBasicQueue(id, sub);
+      return queue.browseInProcess(selector);
+   }
+
+   public List listDurableInProcessMessages(String id, String name) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findDurableBasicQueue(id, name);
+      return queue.browseInProcess(null);
+   }
+
+   public List listDurableInProcessMessages(String id, String name, String selector) throws Exception
+   {
+      if (destination == null)
+         return null;
+      BasicQueue queue = findDurableBasicQueue(id, name);
+      return queue.browseInProcess(selector);
+   }
+
+   public long getNonDurableInProcessMessageCount(String id, String sub) throws Exception
+   {
+      if (destination == null)
+         return 0;
+      BasicQueue queue = findNonDurableBasicQueue(id, sub);
+      return queue.getInProcessMessageCount();
+   }
+
+   public long getDurableInProcessMessageCount(String id, String name) throws Exception
+   {
+      if (destination == null)
+         return 0;
+      BasicQueue queue = findDurableBasicQueue(id, name);
+      return queue.getInProcessMessageCount();
+   }
    
-   /**
-    * @see DestinationMBeanSupport#getMessageCounter()
-    */
    public MessageCounter[] getMessageCounter()
    {
       if (destination == null)

Modified: branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java
===================================================================
--- branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java	2006-09-25 09:48:29 UTC (rev 57129)
+++ branches/Branch_4_0/messaging/src/main/org/jboss/mq/server/jmx/TopicMBean.java	2006-09-25 09:58:42 UTC (rev 57130)
@@ -60,4 +60,32 @@
    List listDurableMessages(String id, String name) throws Exception;
 
    List listDurableMessages(String id, String name, String selector) throws Exception;
+
+   long getNonDurableMessageCount(String id, String sub) throws Exception;
+
+   long getDurableMessageCount(String id, String name) throws Exception;
+
+   List listNonDurableScheduledMessages(String id, String sub) throws Exception;
+
+   List listNonDurableScheduledMessages(String id, String sub, String selector) throws Exception;
+
+   List listDurableScheduledMessages(String id, String name) throws Exception;
+
+   List listDurableScheduledMessages(String id, String name, String selector) throws Exception;
+
+   long getNonDurableScheduledMessageCount(String id, String sub) throws Exception;
+
+   long getDurableScheduledMessageCount(String id, String name) throws Exception;
+
+   List listNonDurableInProcessMessages(String id, String sub) throws Exception;
+
+   List listNonDurableInProcessMessages(String id, String sub, String selector) throws Exception;
+
+   List listDurableInProcessMessages(String id, String name) throws Exception;
+
+   List listDurableInProcessMessages(String id, String name, String selector) throws Exception;
+
+   long getNonDurableInProcessMessageCount(String id, String sub) throws Exception;
+
+   long getDurableInProcessMessageCount(String id, String name) throws Exception;
 }




More information about the jboss-cvs-commits mailing list