[jboss-cvs] JBoss Messaging SVN: r8082 - in branches/Branch_1_4: integration/AS5/etc/xmdesc and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 24 11:27:43 EDT 2010


Author: gaohoward
Date: 2010-08-24 11:27:41 -0400 (Tue, 24 Aug 2010)
New Revision: 8082

Added:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java
Modified:
   branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
   branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
   branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
   branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
Log:
JBMESSAGING-1753


Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2010-08-24 15:27:41 UTC (rev 8082)
@@ -1654,9 +1654,29 @@
 
           <para>Returns the total number of messages in the queue = number not
           being delivered + number being delivered + number being
-          scheduled</para>
+          scheduled.</para>
         </section>
 
+        <section id="conf.destination.queue.attributes.deliveringcount">
+          <title>DeliveringCount</title>
+
+          <para>Returns the number of messages currently being delivered. Based on configuration,
+          this attribute may be disabled (always returns zero). See attribute DeliveringCounterLevel.
+          </para>
+        </section>
+
+        <section id="conf.destination.queue.attributes.deliveringcounterlevel">
+          <title>DeliveringCounterLevel</title>
+
+          <para>This attribute controls how the queue tracks its delivering count. It has three possible string
+          values: NONE, COUNTER, ALL. Value NONE means the queue doesn't keep track of messages being delivered.
+          Value COUNTER means the queue keeps track of the number of messages currently being delivered, but it 
+          doesn't keep track of those messages. Value ALL means the queue keeps track of both the number and the
+          references of the messages being delivered. The last option is the most expensive one as it needs more
+          data structures to manipulate those references. Default is COUNTER.
+          </para>
+        </section>
+
         <section id="conf.destination.queue.attributes.scheduledmessagecount">
           <title>ScheduledMessageCount</title>
 
@@ -1789,6 +1809,18 @@
           match the criteria</para>
         </section>
 
+        <section id="conf.destination.queue.operations.listinprocessmessages">
+          <title>ListInProcessMessages</title>
+
+          <para>List all messages currently being delivered.
+          <warning>
+                To use this functionality you must set the queue's DeliveringCounterLevel attribute
+                to "ALL". Otherwise it always returns an empty list. By default the attribute
+                is set to "COUNTER".
+          </warning>
+          </para>
+        </section>
+
         <section id="conf.destination.queue.operations.resetmessagecounter">
           <title>ResetMessageCounter</title>
 

Modified: branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml	2010-08-24 15:27:41 UTC (rev 8082)
@@ -159,6 +159,12 @@
       <type>int</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="getDeliveringCounterLevel" setMethod="setDeliveringCounterLevel">
+      <description>The delivering counter functionality level (ALL, COUNTER, NONE)</description>
+      <name>DeliveringCounterLevel</name>
+      <type>java.lang.String</type>
+   </attribute>
+   
    <attribute access="read-only" getMethod="getConsumerCount">
       <description>The number of consumers on the queue</description>
       <name>ConsumerCount</name>
@@ -218,6 +224,12 @@
    </operation>   
    
    <operation>
+      <description>List all messages being delivered</description>
+      <name>listInProcessMessages</name>
+      <return-type>java.util.List</return-type>
+   </operation>
+
+   <operation>
       <description>List durable messages</description>
       <name>listDurableMessages</name>
       <return-type>java.util.List</return-type>

Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml	2010-08-24 15:27:41 UTC (rev 8082)
@@ -165,6 +165,12 @@
       <type>int</type>
    </attribute>   
    
+   <attribute access="read-write" getMethod="getDeliveringCounterLevel" setMethod="setDeliveringCounterLevel">
+      <description>The delivering counter functionality level (ALL, COUNTER, NONE)</description>
+      <name>DeliveringCounterLevel</name>
+      <type>java.lang.String</type>
+   </attribute>
+   
    <!-- instance access -->
 
    <attribute access="read-only" getMethod="getInstance">
@@ -218,6 +224,12 @@
    </operation>   
    
    <operation>
+      <description>List all messages being delivered</description>
+      <name>listInProcessMessages</name>
+      <return-type>java.util.List</return-type>
+   </operation>
+   
+   <operation>
       <description>List durable messages</description>
       <name>listDurableMessages</name>
       <return-type>java.util.List</return-type>

Modified: branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml	2010-08-24 15:27:41 UTC (rev 8082)
@@ -159,6 +159,12 @@
       <type>int</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="getDeliveringCounterLevel" setMethod="setDeliveringCounterLevel">
+      <description>The delivering counter functionality level (ALL, COUNTER, NONE)</description>
+      <name>DeliveringCounterLevel</name>
+      <type>java.lang.String</type>
+   </attribute>
+   
    <attribute access="read-only" getMethod="getConsumerCount">
       <description>The number of consumers on the queue</description>
       <name>ConsumerCount</name>
@@ -218,6 +224,12 @@
    </operation>   
    
    <operation>
+      <description>List all messages being delivered</description>
+      <name>listInProcessMessages</name>
+      <return-type>java.util.List</return-type>
+   </operation>
+
+   <operation>
       <description>List durable messages</description>
       <name>listDurableMessages</name>
       <return-type>java.util.List</return-type>

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -30,6 +30,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.Queue;
+import org.jboss.messaging.core.impl.DeliveringCounter;
 
 /**
  * A ManagedQueue
@@ -58,6 +59,8 @@
    
    private Queue queue;
 
+   private int deliveringCounterLevel = DeliveringCounter.LEVEL_COUNTER;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public ManagedQueue()
@@ -106,6 +109,16 @@
       return count;
    }
    
+   public int getDeliveringCounterLevel()
+   {
+      return deliveringCounterLevel;
+   }
+   
+   public void setDeliveringCounterLevel(int level)
+   {
+      deliveringCounterLevel = level;
+   }
+
    public int getScheduledMessageCount() throws Exception
    {     
       int count = queue.getScheduledCount();
@@ -133,6 +146,11 @@
    {
       return this.listMessages(ALL, selector);
    }
+
+   public List listInProcessMessages() throws Exception
+   {
+      return queue.listInProcessMessages();
+   }
    
    public List listDurableMessages(String selector) throws Exception
    {

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -15,6 +15,7 @@
 import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
+import org.jboss.messaging.core.impl.DeliveringCounter;
 import org.jboss.messaging.core.impl.MessagingQueue;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.XMLUtil;
@@ -110,6 +111,9 @@
             // Must be done after load
             queue.setMaxSize(destination.getMaxSize());
             
+            int delCounterLevel = DeliveringCounter.toLevel(getDeliveringCounterLevel());
+            queue.setDeliveringCounterLevel(delCounterLevel);
+            
             queue.activate();           
          }
          else
@@ -125,8 +129,9 @@
                                        destination.getMaxSize(), null,
                                        destination.getFullSize(), destination.getPageSize(),
                                        destination.getDownCacheSize(), destination.isClustered(),
-                                       serverPeer.getRecoverDeliveriesTimeout());
-            po.addBinding(new Binding(queueCond, queue, false), false);         
+                                       serverPeer.getRecoverDeliveriesTimeout(),
+                                       ((ManagedQueue)destination).getDeliveringCounterLevel());
+            po.addBinding(new Binding(queueCond, queue, false), false);
             
             queue.activate();
          }
@@ -294,6 +299,29 @@
    {
       return ((ManagedQueue)destination).getConsumersCount();
    }
+   
+   public String getDeliveringCounterLevel() throws Exception
+   {
+      int level = ((ManagedQueue)destination).getDeliveringCounterLevel();
+      return DeliveringCounter.toString(level);
+   }
+   
+   public void setDeliveringCounterLevel(String level) throws Exception
+   {
+      try
+      {
+         if (started)
+         {
+            log.warn("DeliveringCounterLevel can only be set before the queue is started.");
+            return;
+         }
+         ((ManagedQueue)destination).setDeliveringCounterLevel(DeliveringCounter.toLevel(level));
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMXInvocation(t, this + " getDeliveringCounterLevel");
+      }
+   }
      
    // JMX managed operations -----------------------------------------------------------------------
       
@@ -333,6 +361,24 @@
       } 
    }
    
+   public List listInProcessMessages() throws Exception
+   {
+      try
+      {
+         if (!started)
+         {
+            log.warn("Queue is stopped.");
+            return null;
+         }
+         
+         return ((ManagedQueue)destination).listInProcessMessages();
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMXInvocation(t, this + " listInProcessMessages");
+      } 
+   }
+   
    public List listAllMessages(String selector) throws Exception
    {
       try

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -98,4 +98,8 @@
    void setClustered(boolean isClustered);
 
    void staticMerge(Queue queue) throws Exception;
+
+   List listInProcessMessages();
+
+   void setDeliveringCounterLevel(int delCounterLevel);
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -97,7 +97,7 @@
    //Having to keep this count requires synchronization between delivery thread and acknowledgement
    //thread which will hamper concurrency
    //Suggest that we have a flag that disables this for production systems
-   protected SynchronizedInt deliveringCount;
+   protected DeliveringCounter deliveringCount;
 
    protected Set scheduledDeliveries;
 
@@ -110,10 +110,15 @@
    protected OrderingGroupMonitor monitor = new OrderingGroupMonitor();
 
    // Constructors ---------------------------------------------------------------------------------
-
    protected ChannelSupport(long channelID, PersistenceManager pm,
                             boolean recoverable, int maxSize)
    {
+      this(channelID, pm, recoverable, maxSize, DeliveringCounter.LEVEL_COUNTER);
+   }
+
+   protected ChannelSupport(long channelID, PersistenceManager pm,
+                            boolean recoverable, int maxSize, int delCounterLevel)
+   {
       if (trace) { log.trace("creating " + (pm != null ? "recoverable " : "non-recoverable ") + "channel[" + channelID + "]"); }
 
       this.pm = pm;
@@ -126,7 +131,7 @@
 
       lock = new Object();
 
-      deliveringCount = new SynchronizedInt(0);
+      deliveringCount = new DeliveringCounter(delCounterLevel);
 
       scheduledDeliveries = new HashSet();
 
@@ -315,7 +320,7 @@
 
       if (!del.isRecovered())
       {
-      	deliveringCount.decrement();
+      	deliveringCount.decrement(ref.getMessage());
       }
 
       if (!checkAndSchedule(ref))
@@ -335,6 +340,11 @@
    {
       return recoverable;
    }
+   
+   public List listInProcessMessages()
+   {
+      return deliveringCount.getMessages();
+   }
 
    public List browse(Filter filter)
    {
@@ -358,6 +368,7 @@
             MessageReference ref = (MessageReference) i.next();
             messages.add(ref.getMessage());
          }
+         
          return messages;
       }
    }
@@ -433,7 +444,7 @@
          }
 
          clearAllScheduledDeliveries(true);
-         deliveringCount.set(0);
+         deliveringCount.reset();
 
          log.trace(this + " done removing all references, there are " + this.messageRefs.size());
       }
@@ -535,6 +546,11 @@
          }
       }
    }
+   
+   public void setDeliveringCounterLevel(int lvl)
+   {
+      deliveringCount.setLevel(lvl);
+   }
 
    public int getMessagesAdded()
    {
@@ -730,7 +746,7 @@
                         }
                      }
 
-                     deliveringCount.increment();
+                     deliveringCount.increment(ref.getMessage());
                   }
                }
             }
@@ -777,7 +793,7 @@
 
                // Receiver accepted the reference
 
-               deliveringCount.increment();
+               deliveringCount.increment(ref.getMessage());
 
                return true;
             }
@@ -827,7 +843,7 @@
 
          if (!d.isRecovered())
          {
-         	deliveringCount.decrement();
+         	deliveringCount.decrement(d.getReference().getMessage());
          }
          
          MessageReference ref = d.getReference();
@@ -1047,7 +1063,7 @@
 
                if (!del.isRecovered())
                {
-               	deliveringCount.decrement();
+               	deliveringCount.decrement(del.getReference().getMessage());
                }
                
                MessageReference ref = del.getReference();
@@ -1121,7 +1137,7 @@
                   {
                      throw new TransactionException("Failed to add reference", t);
                   }
-                  deliveringCount.decrement();
+                  deliveringCount.decrement(ref.getMessage());
                }
             }
          }

Added: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java	                        (rev 0)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -0,0 +1,208 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2010, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Message;
+
+/**
+ * A DeliveringCounter: keeps track of delivering count and messages.
+ * 
+ * Because of the asynchronous nature of message delivery and acknowledge, message acknowledgments may
+ * come in before corresponding counting increment incurred with message delivery. For example,
+ * Message m1 is being delivered to a consumer and then is put to this counter. Normally the ack of m1
+ * comes later and causes the counter to decrement. m1 will be correctly added and removed.
+ * However, there is a minor chance that the ack of m1 comes in before it is put into this counter.
+ * Therefore when decrement() is called m1 is not there. Later m1 is put into the counter and will never
+ * be cleared because it has missed the ack. This would be fine with a pure delivering count (an integer)
+ * but not when retrieve messages being delivered. In above case, an acked message m1 will be returned 
+ * when being asked for even if it has be acked.
+ * 
+ * Also, adding in-processing message retrieval functionality entails more data structure operations for 
+ * message references tracking, thus bringing extra performance cost. This shouldn't be necessary for users
+ * who doesn't need this functionality. 
+ * 
+ * So in the implementation, we use three variables:
+ * 
+ * - an AtomicInteger to purely track the delivering count;
+ * - an ConcurrentHashMap to hold messages being delivered;
+ * - an ConcurrentLinkedQueue to hold messages that are being acknowledged but not yet put in the above map.
+ *
+ * In addition, a int flag to control the functionality level:
+ * 
+ * - LEVEL_NONE denotes totally disable delivering counter.
+ * - LEVEL_COUNTER denotes delivering count only. This is the default level.
+ * - LEVEL_ALL denotes full delivering counter functionality.
+ * 
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Aug 18, 2010 1:27:46 PM
+ */
+public class DeliveringCounter
+{
+   public static final int LEVEL_NONE = 0;
+   public static final int LEVEL_COUNTER = 1;
+   public static final int LEVEL_ALL = 255;
+   
+   private static final Logger log = Logger.getLogger(DeliveringCounter.class);
+   
+   private int level = LEVEL_COUNTER;
+   
+   private ConcurrentHashMap<Long, Message> msgCount;
+   private ConcurrentLinkedQueue<Message> earlyMsgs;
+   private AtomicInteger counter;
+
+   public DeliveringCounter()
+   {
+      this(LEVEL_COUNTER);
+   }
+   
+   public DeliveringCounter(int lv)
+   {
+      initLevel(lv);
+   }
+   
+   private void initLevel(int lv)
+   {
+      level = lv;
+      log.debug("Delivering Counter enabled: " + level);
+      if (level == LEVEL_NONE) return;
+      counter = new AtomicInteger(0);
+      if (level == LEVEL_ALL)
+      {
+         msgCount = new ConcurrentHashMap<Long, Message>();
+         earlyMsgs = new ConcurrentLinkedQueue<Message>();
+      }      
+   }
+   
+   public void decrement(Message acked)
+   {
+      if (level == LEVEL_NONE) return;
+      counter.decrementAndGet();
+      if (level != LEVEL_ALL) return;
+      Message m = msgCount.remove(acked.getMessageID());
+      if (m == null)
+      {
+         earlyMsgs.add(acked);
+      }
+   }
+
+   public void increment(Message msg)
+   {
+      if (level == LEVEL_NONE) return;
+      counter.incrementAndGet();
+      if (level != LEVEL_ALL) return;
+      msgCount.put(msg.getMessageID(), msg);
+   }
+
+   public int get()
+   {
+      if (level == LEVEL_NONE) return 0;
+      return counter.get();
+   }
+
+   public void reset()
+   {
+      if (level == LEVEL_NONE) return;
+      counter.set(0);
+      if (level != LEVEL_ALL) return;
+      earlyMsgs.clear();
+      msgCount.clear();
+   }
+
+   public synchronized List getMessages()
+   {
+      if (level != LEVEL_ALL) return new ArrayList<Message>();
+      Iterator<Message> iterId = earlyMsgs.iterator();
+      while (iterId.hasNext())
+      {
+         Message acked = iterId.next();
+         Message m = msgCount.remove(acked.getMessageID());
+         if (m != null)
+         {
+            iterId.remove();
+         }
+      }
+      Iterator<Message> iter = msgCount.values().iterator();
+      List<Message> list = new ArrayList<Message>();
+      while(iter.hasNext()) {
+         list.add(iter.next());
+      }
+      return list;
+   }
+   
+   //for tests only
+   public void setLevel(int lv)
+   {
+      initLevel(lv);
+   }
+
+   //for tests only
+   public int getLevel()
+   {
+      return level;
+   }
+
+   public static String toString(int lvl) throws Exception
+   {
+      if (lvl == LEVEL_NONE)
+      {
+         return "NONE";
+      }
+      if (lvl == LEVEL_COUNTER)
+      {
+         return "COUNTER";
+      }
+      if (lvl == LEVEL_ALL)
+      {
+         return "ALL";
+      }
+      throw new Exception("Invalid value: " + lvl);
+   }
+
+   public static int toLevel(String lvl) throws Exception
+   {
+      if ("NONE".equals(lvl))
+      {
+         return LEVEL_NONE;
+      }
+      if ("COUNTER".equals(lvl))
+      {
+         return LEVEL_COUNTER;
+      }
+      if ("ALL".equals(lvl))
+      {
+         return LEVEL_ALL;
+      }
+      throw new Exception("Invalid value: \"" + lvl + "\"");
+   }
+}

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -142,7 +142,17 @@
    	
    	setup(nodeID, name, filter, clustered, DEFAULT_RECOVER_DELIVERIES_TIMEOUT);   	
    }
-   
+
+   public MessagingQueue(int nodeID, String name, long id, MessageStore ms, PersistenceManager pm,             
+                         boolean recoverable, int maxSize, Filter filter,
+                         int fullSize, int pageSize, int downCacheSize, boolean clustered,
+                         long recoverDeliveriesTimeout, int deliveringCounterLevel)
+   {
+      this(nodeID, name, id, ms, pm, recoverable, maxSize, filter, fullSize, pageSize, downCacheSize, clustered,
+           recoverDeliveriesTimeout);
+      this.deliveringCount.setLevel(deliveringCounterLevel);
+   }
+
    private void setup(int nodeID, String name, Filter filter, boolean clustered, long recoverDeliveriesTimeout)
    {
    	this.nodeID = nodeID;
@@ -262,7 +272,7 @@
          			
          			recoveryMap.put(new Long(message.getMessageID()), re);
          			
-         			deliveringCount.increment();
+         			deliveringCount.increment(ref.getMessage());
          			
          			iter.remove();
          			
@@ -357,7 +367,7 @@
 				messageRefs.addFirst(ref, ref.getMessage().getPriority());
 				
 				//Need to decrement the delivery count too
-				deliveringCount.decrement();
+				deliveringCount.decrement(ref.getMessage());
 			}
 							
 			if (trace) { log.trace("Found one, added back on queue"); }   
@@ -699,7 +709,7 @@
 					{		
 						messageRefs.addFirst(ref, ref.getMessage().getPriority());		
 						
-						deliveringCount.decrement();
+						deliveringCount.decrement(ref.getMessage());
 					}					
 					
 					added = true;
@@ -720,4 +730,10 @@
    {
       clustered = isClustered;
    }
+
+   //only used by tests
+   public DeliveringCounter getDeliveringCounter()
+   {
+      return deliveringCount;
+   }
 }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -23,8 +23,13 @@
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.Message;
@@ -32,6 +37,7 @@
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.contract.Receiver;
+import org.jboss.messaging.core.impl.DeliveringCounter;
 import org.jboss.messaging.core.impl.JDBCPersistenceManager;
 import org.jboss.messaging.core.impl.MessagingQueue;
 import org.jboss.messaging.core.impl.message.SimpleMessageStore;
@@ -1658,6 +1664,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1672,6 +1679,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -1735,6 +1745,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1749,6 +1760,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -1762,12 +1776,19 @@
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       tx.commit();
 
       assertTrue(queue.browse(null).isEmpty());
 
       deliveringCount = queue.getDeliveringCount();
       assertEquals(0, deliveringCount);
+
+      listMessages = queue.listInProcessMessages();
+      assertEquals(0, listMessages.size());
+
    }
 
 
@@ -1782,6 +1803,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1796,6 +1818,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -1809,11 +1834,17 @@
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       tx.rollback();
 
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       // acknowledge non-transactionally
       r.acknowledge(ackm, null);
 
@@ -1984,6 +2015,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1998,6 +2030,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List received = r.getMessages();
       assertEquals(1, received.size());
       Message rm = (Message)received.iterator().next();
@@ -2033,6 +2068,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2047,6 +2083,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List received = r.getMessages();
       assertEquals(1, received.size());
       Message rm = (Message)received.iterator().next();
@@ -2060,6 +2099,9 @@
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       tx.commit();
 
       assertTrue(queue.browse(null).isEmpty());
@@ -2185,6 +2227,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2210,6 +2253,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -2290,6 +2336,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2316,6 +2363,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List received = r.getMessages();
       assertEquals(1, received.size());
       Message rm = (Message)received.iterator().next();
@@ -2342,6 +2392,8 @@
          // we test only non-recoverable channels now
          return;
       }
+      
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2372,6 +2424,11 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(NUMBER_OF_MESSAGES, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+      
+      log.info("list: " + listMessages);
+
       assertEqualSets(refs, r.getMessages());
    }
 
@@ -2385,6 +2442,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2416,6 +2474,10 @@
 
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(NUMBER_OF_MESSAGES, deliveringCount);
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
    }
 
@@ -2691,6 +2753,8 @@
 
       List stored = queue.browse(null);
       assertEquals(1, stored.size());
+      
+      log.info("----------------browsed: " + stored);
 
       Message sm = (Message)stored.iterator().next();
       assertFalse(sm.isReliable());
@@ -3870,6 +3934,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -3884,6 +3949,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -3947,6 +4015,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -3961,6 +4030,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -3974,6 +4046,9 @@
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       tx.commit();
 
       assertTrue(queue.browse(null).isEmpty());
@@ -3991,6 +4066,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4005,6 +4081,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -4018,11 +4097,17 @@
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       tx.rollback();
 
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       // acknowledge non-transactionally
       r.acknowledge(ackm, null);
 
@@ -4045,6 +4130,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4062,6 +4148,10 @@
       }
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4086,6 +4176,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4103,6 +4194,10 @@
       }
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       Transaction tx = tr.createTransaction();
@@ -4116,6 +4211,9 @@
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       tx.commit();
 
       assertTrue(queue.browse(null).isEmpty());
@@ -4133,6 +4231,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4150,6 +4249,10 @@
       }
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       Transaction tx = tr.createTransaction();
@@ -4163,10 +4266,16 @@
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       tx.rollback();
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       // acknowledge non-transactionally
       for(Iterator i = r.getMessages().iterator(); i.hasNext();)
       {
@@ -4197,8 +4306,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4212,6 +4321,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -4328,8 +4440,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4343,6 +4455,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -4356,6 +4471,9 @@
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       tx.commit();
 
       assertTrue(queue.browse(null).isEmpty());
@@ -4373,8 +4491,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4388,6 +4506,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -4401,11 +4522,17 @@
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       tx.rollback();
 
       deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       // acknowledge non-transactionally
       r.acknowledge(ackm, null);
 
@@ -4428,8 +4555,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4447,6 +4574,9 @@
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4470,8 +4600,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4488,6 +4618,10 @@
       }
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       Transaction tx = tr.createTransaction();
@@ -4501,6 +4635,9 @@
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       tx.commit();
 
       assertTrue(queue.browse(null).isEmpty());
@@ -4517,8 +4654,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4535,6 +4672,10 @@
       }
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       Transaction tx = tr.createTransaction();
@@ -4548,10 +4689,16 @@
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       tx.rollback();
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
 
+      listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       // acknowledge non-transactionally
       for(Iterator i = r.getMessages().iterator(); i.hasNext();)
       {
@@ -4583,8 +4730,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4609,6 +4756,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -4631,8 +4781,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4660,6 +4810,10 @@
       tx.commit();
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4687,8 +4841,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4714,6 +4868,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List acknowledging = r.getMessages();
       assertEquals(1, acknowledging.size());
       Message ackm = (Message)acknowledging.get(0);
@@ -4736,6 +4893,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4765,6 +4923,10 @@
 
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4787,8 +4949,8 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
-
       // add an NACKING receiver to the channel
       SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(queue.getLocalDistributor().add(r));
@@ -4818,6 +4980,10 @@
       tx.commit();
 
       assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
       assertEqualSets(refs, r.getMessages());
 
       for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -5168,6 +5334,7 @@
          // we test only non-recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // the channel has no receivers
       assertFalse(queue.getLocalDistributor().iterator().hasNext());
@@ -5193,6 +5360,9 @@
       int deliveringCount = queue.getDeliveringCount();
       assertEquals(1, deliveringCount);
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List messages = receiver.getMessages();
       assertEquals(1, messages.size());
       Message sm = (Message)messages.iterator().next();
@@ -5302,6 +5472,7 @@
          // we test only recoverable channels now
          return;
       }
+      queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
 
       // the channel has no receivers
       assertFalse(queue.getLocalDistributor().iterator().hasNext());
@@ -5326,6 +5497,9 @@
 
       assertEquals(1, queue.getDeliveringCount());
 
+      List listMessages = queue.listInProcessMessages();
+      assertEquals(1, listMessages.size());
+
       List messages = receiver.getMessages();
       assertEquals(1, messages.size());
       Message sm = (Message)messages.iterator().next();
@@ -5367,7 +5541,176 @@
       assertFalse(queue.getLocalDistributor().remove(new SimpleReceiver("INEXISTENT")));
    }
 
+   public void testDeliveringCounterStatics() throws Exception
+   {
+      try
+      {
+         DeliveringCounter.toString(-1);
+         fail("-1 is not a valid level. It should reject it and throw an exception.");
+      }
+      catch (Exception e)
+      {
+         //ok
+      }
+      
+      String levelStr = DeliveringCounter.toString(DeliveringCounter.LEVEL_ALL);
+      assertEquals("ALL", levelStr);
+      
+      levelStr = DeliveringCounter.toString(DeliveringCounter.LEVEL_COUNTER);
+      assertEquals("COUNTER", levelStr);
+      
+      levelStr = DeliveringCounter.toString(DeliveringCounter.LEVEL_NONE);
+      assertEquals("NONE", levelStr);
 
+      try
+      {
+         int l = DeliveringCounter.toLevel("AABB");
+         fail("AABB is not a valid value, but it returns: " + l);
+      }
+      catch (Exception e)
+      {
+         //ok
+      }
+      
+      int lv = DeliveringCounter.toLevel("ALL");
+      assertEquals(DeliveringCounter.LEVEL_ALL, lv);
+      
+      lv = DeliveringCounter.toLevel("NONE");
+      assertEquals(DeliveringCounter.LEVEL_NONE, lv);
+      
+      lv = DeliveringCounter.toLevel("COUNTER");
+      assertEquals(DeliveringCounter.LEVEL_COUNTER, lv);
+      
+   }
+
+   public void testDeliveringCounterNone() throws Exception
+   {
+      DeliveringCounter counter = new DeliveringCounter(DeliveringCounter.LEVEL_NONE);
+      
+      assertEquals(DeliveringCounter.LEVEL_NONE, counter.getLevel());
+      Message[] mids = new Message[1000];
+      for (int i = 0; i < 1000; i++)
+      {
+         Message m = CoreMessageFactory.createCoreMessage(i);
+         counter.increment(m);
+         mids[i] = m;
+      }
+      assertEquals(0, counter.get());
+      for (int i = 0; i < 49; i++) {
+         counter.decrement(mids[i]);
+      }
+      assertEquals(0, counter.get());
+      counter.reset();
+      assertEquals(0, counter.get());
+   }
+
+   public void testDeliveringCounterDefault() throws Exception
+   {
+      DeliveringCounter counter = new DeliveringCounter();
+      
+      assertEquals(DeliveringCounter.LEVEL_COUNTER, counter.getLevel());
+      Message[] mids = new Message[1000];
+      for (int i = 0; i < 1000; i++)
+      {
+         Message m = CoreMessageFactory.createCoreMessage(i);
+         counter.increment(m);
+         mids[i] = m;
+      }
+      assertEquals(1000, counter.get());
+      for (int i = 0; i < 49; i++) {
+         counter.decrement(mids[i]);
+      }
+      assertEquals(951, counter.get());
+      counter.reset();
+      assertEquals(0, counter.get());
+   }
+
+   public void testDeliveringCounterAll() throws Exception
+   {
+      DeliveringCounter counter = new DeliveringCounter(DeliveringCounter.LEVEL_ALL);
+      
+      assertEquals(DeliveringCounter.LEVEL_ALL, counter.getLevel());
+      Message[] mids = new Message[1000];
+      
+      Map<Long, Message> msgs = new HashMap<Long, Message>();
+      
+      for (int i = 0; i < 1000; i++)
+      {
+         Message m = CoreMessageFactory.createCoreMessage(i);
+         msgs.put(m.getMessageID(), m);
+         counter.increment(m);
+         mids[i] = m;
+      }
+      assertEquals(1000, counter.get());
+      for (int i = 0; i < 49; i++) {
+         counter.decrement(mids[i]);
+         msgs.remove(mids[i].getMessageID());
+      }
+      assertEquals(951, counter.get());
+      assertEquals(951, counter.getMessages().size());
+      
+      List inProcMsgs = counter.getMessages();
+      for (Object o : inProcMsgs)
+      {
+         Message msg = (Message)o;
+         msgs.remove(msg.getMessageID());
+      }
+      assertEquals(0, msgs.size());
+      
+      counter.reset();
+      assertEquals(0, counter.get());
+      assertEquals(0, counter.getMessages().size());
+   }
+
+   /*
+    * create 10 threads, 5 threads put messages to the counter
+    * the other 5 remove messages from the counter.
+    * total number of messages 50000. Messages are put into
+    * maps for each group of threads.
+    * 
+    * When those threads finish, the counter should be empty.
+    * 
+    */
+   public void testDeliveringCounterConcurrent() throws Exception
+   {
+      DeliveringCounter counter = new DeliveringCounter(DeliveringCounter.LEVEL_ALL);
+      
+      assertEquals(DeliveringCounter.LEVEL_ALL, counter.getLevel());
+      final int numMsg = 50000;
+      Message[] mids = new Message[numMsg];
+      
+      final ConcurrentLinkedQueue<Message> msgs1 = new ConcurrentLinkedQueue<Message>();
+      final ConcurrentLinkedQueue<Message> msgs2 = new ConcurrentLinkedQueue<Message>();
+      
+      for (int i = 0; i < numMsg; i++)
+      {
+         Message m = CoreMessageFactory.createCoreMessage(i);
+         msgs1.add(m);
+         msgs2.add(m);
+      }
+      
+      final int numThr = 5;
+      //create putter threads
+      Thread[] putters = new PutterThread[numThr];
+      Thread[] takers = new TakerThread[numThr];
+      
+      for (int i = 0; i < numThr; i++) {
+         putters[i] = new PutterThread(counter, msgs1, i);
+         takers[i] = new TakerThread(counter, msgs2, i);
+         putters[i].start();
+         takers[i].start();
+      }
+      
+      for (int i = 0; i < numThr; i++) {
+         putters[i].join();
+         takers[i].join();
+      }
+
+      List msgs = counter.getMessages();
+      assertEquals(0, msgs.size());
+      assertEquals(0, counter.get());
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -5389,5 +5732,59 @@
    }
 
    // Inner classes -------------------------------------------------
+   static class PutterThread extends Thread
+   {
+      protected ConcurrentLinkedQueue<Message> messages;
+      protected DeliveringCounter counter;
+      protected int index;
+      
+      public PutterThread(DeliveringCounter c, ConcurrentLinkedQueue<Message> m, int i)
+      {
+         counter = c;
+         messages = m;
+         index = i%10;
+      }
+      
+      public void run()
+      {
+         int num = 0;
+         Message m = messages.poll();
+         //give some delay
+         while (m != null)
+         {
+            num++;
+            if (num % 10 == index)
+            {
+               try
+               {
+                  Thread.sleep(1);
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+            doWork(m);
+            m = messages.poll();
+         }
+      }
+      
+      protected void doWork(Message m)
+      {
+         counter.increment(m);
+      }
+   }
 
+   static class TakerThread extends PutterThread
+   {
+      public TakerThread(DeliveringCounter c, ConcurrentLinkedQueue<Message> m, int i)
+      {
+         super(c, m, i);
+      }
+
+      protected void doWork(Message m)
+      {
+         counter.decrement(m);
+      }
+      
+   }
 }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -482,9 +482,7 @@
          }
 
          assertTrue(fast.processed == numMessages - 2);
-
-         // Thread.sleep(10000);
-
+         assertTrue(slow.processed == 2);
       }
       finally
       {

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -400,6 +400,15 @@
       public void staticMerge(org.jboss.messaging.core.contract.Queue queue) throws Exception
       {
       }
+
+      public List listInProcessMessages()
+      {
+         return null;
+      }
+
+      public void setDeliveringCounterLevel(int delCounterLevel)
+      {
+      }
    }
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -929,7 +929,88 @@
          deploy(config);
          undeployDestination("example/tests/myqueue/ver1");
    }
+   
+   public void testQueueDeliveringCounterLevelAttr() throws Exception
+   {
+      //Default
+      String config =
+         "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+         "       name=\"somedomain:service=Queue,name=DCLQueue\"" +
+         "       xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+         "    <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+         "</mbean>";
 
+      ObjectName destObjectName = deploy(config);
+
+      assertEquals("COUNTER", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+      undeployDestination("DCLQueue");
+
+      //None
+      config =
+         "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+         "       name=\"somedomain:service=Queue,name=DCLQueue\"" +
+         "       xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+         "    <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+         "    <attribute name=\"DeliveringCounterLevel\">NONE</attribute>" +
+         "</mbean>";
+
+      destObjectName = deploy(config);
+
+      assertEquals("NONE", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+      undeployDestination("DCLQueue");
+
+      //All
+      config =
+         "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+         "       name=\"somedomain:service=Queue,name=DCLQueue\"" +
+         "       xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+         "    <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+         "    <attribute name=\"DeliveringCounterLevel\">ALL</attribute>" +
+         "</mbean>";
+
+      destObjectName = deploy(config);
+
+      assertEquals("ALL", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+      undeployDestination("DCLQueue");
+
+      //Counter
+      config =
+         "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+         "       name=\"somedomain:service=Queue,name=DCLQueue\"" +
+         "       xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+         "    <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+         "    <attribute name=\"DeliveringCounterLevel\">COUNTER</attribute>" +
+         "</mbean>";
+
+      destObjectName = deploy(config);
+
+      assertEquals("COUNTER", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+      undeployDestination("DCLQueue");
+
+      //Error
+      config =
+         "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+         "       name=\"somedomain:service=Queue,name=DCLQueue\"" +
+         "       xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+         "    <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+         "    <attribute name=\"DeliveringCounterLevel\">XXXX</attribute>" +
+         "</mbean>";
+
+      try
+      {
+         destObjectName = deploy(config);
+         fail("XXXX is not a valid value and the queue should have rejected it.");
+      }
+      catch(Exception e)
+      {
+         //ok
+      }
+      finally
+      {
+         undeployDestination("DCLQueue");
+      }
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java	2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java	2010-08-24 15:27:41 UTC (rev 8082)
@@ -22,7 +22,9 @@
 package org.jboss.test.messaging.util;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 import org.jboss.messaging.core.impl.message.CoreMessage;
 
@@ -59,6 +61,12 @@
                                                Map coreHeaders,
                                                Serializable payload)
    {
+      if (coreHeaders == null) {
+         coreHeaders = new HashMap();
+      }
+      if (coreHeaders.get("JBM_MESSAGE_ID") == null) {
+         coreHeaders.put("JBM_MESSAGE_ID", "ID:JBM-" + UUID.randomUUID().toString());         
+      }
       CoreMessage cm =
          new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, null);
       cm.setPayload(payload);



More information about the jboss-cvs-commits mailing list