[jboss-cvs] JBoss Messaging SVN: r8082 - in branches/Branch_1_4: integration/AS5/etc/xmdesc and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 24 11:27:43 EDT 2010
Author: gaohoward
Date: 2010-08-24 11:27:41 -0400 (Tue, 24 Aug 2010)
New Revision: 8082
Added:
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java
Modified:
branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
Log:
JBMESSAGING-1753
Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2010-08-24 15:27:41 UTC (rev 8082)
@@ -1654,9 +1654,29 @@
<para>Returns the total number of messages in the queue = number not
being delivered + number being delivered + number being
- scheduled</para>
+ scheduled.</para>
</section>
+ <section id="conf.destination.queue.attributes.deliveringcount">
+ <title>DeliveringCount</title>
+
+ <para>Returns the number of messages currently being delivered. Based on configuration,
+ this attribute may be disabled (always returns zero). See attribute DeliveringCounterLevel.
+ </para>
+ </section>
+
+ <section id="conf.destination.queue.attributes.deliveringcounterlevel">
+ <title>DeliveringCounterLevel</title>
+
+ <para>This attribute controls how the queue tracks its delivering count. It has three possible string
+ values: NONE, COUNTER, ALL. Value NONE means the queue doesn't keep track of messages being delivered.
+ Value COUNTER means the queue keeps track of the number of messages currently being delivered, but it
+ doesn't keep track of those messages. Value ALL means the queue keeps track of both the number and the
+ references of the messages being delivered. The last option is the most expensive one as it needs more
+ data structures to manipulate those references. Default is COUNTER.
+ </para>
+ </section>
+
<section id="conf.destination.queue.attributes.scheduledmessagecount">
<title>ScheduledMessageCount</title>
@@ -1789,6 +1809,18 @@
match the criteria</para>
</section>
+ <section id="conf.destination.queue.operations.listinprocessmessages">
+ <title>ListInProcessMessages</title>
+
+ <para>List all messages currently being delivered.
+ <warning>
+ To use this functionality you must set the queue's DeliveringCounterLevel attribute
+ to "ALL". Otherwise it always returns an empty list. By default the attribute
+ is set to "COUNTER".
+ </warning>
+ </para>
+ </section>
+
<section id="conf.destination.queue.operations.resetmessagecounter">
<title>ResetMessageCounter</title>
Modified: branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml 2010-08-24 15:27:41 UTC (rev 8082)
@@ -159,6 +159,12 @@
<type>int</type>
</attribute>
+ <attribute access="read-write" getMethod="getDeliveringCounterLevel" setMethod="setDeliveringCounterLevel">
+ <description>The delivering counter functionality level (ALL, COUNTER, NONE)</description>
+ <name>DeliveringCounterLevel</name>
+ <type>java.lang.String</type>
+ </attribute>
+
<attribute access="read-only" getMethod="getConsumerCount">
<description>The number of consumers on the queue</description>
<name>ConsumerCount</name>
@@ -218,6 +224,12 @@
</operation>
<operation>
+ <description>List all messages being delivered</description>
+ <name>listInProcessMessages</name>
+ <return-type>java.util.List</return-type>
+ </operation>
+
+ <operation>
<description>List durable messages</description>
<name>listDurableMessages</name>
<return-type>java.util.List</return-type>
Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml 2010-08-24 15:27:41 UTC (rev 8082)
@@ -165,6 +165,12 @@
<type>int</type>
</attribute>
+ <attribute access="read-write" getMethod="getDeliveringCounterLevel" setMethod="setDeliveringCounterLevel">
+ <description>The delivering counter functionality level (ALL, COUNTER, NONE)</description>
+ <name>DeliveringCounterLevel</name>
+ <type>java.lang.String</type>
+ </attribute>
+
<!-- instance access -->
<attribute access="read-only" getMethod="getInstance">
@@ -218,6 +224,12 @@
</operation>
<operation>
+ <description>List all messages being delivered</description>
+ <name>listInProcessMessages</name>
+ <return-type>java.util.List</return-type>
+ </operation>
+
+ <operation>
<description>List durable messages</description>
<name>listDurableMessages</name>
<return-type>java.util.List</return-type>
Modified: branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/integration/EAP5/etc/xmdesc/Queue-xmbean.xml 2010-08-24 15:27:41 UTC (rev 8082)
@@ -159,6 +159,12 @@
<type>int</type>
</attribute>
+ <attribute access="read-write" getMethod="getDeliveringCounterLevel" setMethod="setDeliveringCounterLevel">
+ <description>The delivering counter functionality level (ALL, COUNTER, NONE)</description>
+ <name>DeliveringCounterLevel</name>
+ <type>java.lang.String</type>
+ </attribute>
+
<attribute access="read-only" getMethod="getConsumerCount">
<description>The number of consumers on the queue</description>
<name>ConsumerCount</name>
@@ -218,6 +224,12 @@
</operation>
<operation>
+ <description>List all messages being delivered</description>
+ <name>listInProcessMessages</name>
+ <return-type>java.util.List</return-type>
+ </operation>
+
+ <operation>
<description>List durable messages</description>
<name>listDurableMessages</name>
<return-type>java.util.List</return-type>
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedQueue.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -30,6 +30,7 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.Queue;
+import org.jboss.messaging.core.impl.DeliveringCounter;
/**
* A ManagedQueue
@@ -58,6 +59,8 @@
private Queue queue;
+ private int deliveringCounterLevel = DeliveringCounter.LEVEL_COUNTER;
+
// Constructors ---------------------------------------------------------------------------------
public ManagedQueue()
@@ -106,6 +109,16 @@
return count;
}
+ public int getDeliveringCounterLevel()
+ {
+ return deliveringCounterLevel;
+ }
+
+ public void setDeliveringCounterLevel(int level)
+ {
+ deliveringCounterLevel = level;
+ }
+
public int getScheduledMessageCount() throws Exception
{
int count = queue.getScheduledCount();
@@ -133,6 +146,11 @@
{
return this.listMessages(ALL, selector);
}
+
+ public List listInProcessMessages() throws Exception
+ {
+ return queue.listInProcessMessages();
+ }
public List listDurableMessages(String selector) throws Exception
{
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -15,6 +15,7 @@
import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
+import org.jboss.messaging.core.impl.DeliveringCounter;
import org.jboss.messaging.core.impl.MessagingQueue;
import org.jboss.messaging.util.ExceptionUtil;
import org.jboss.messaging.util.XMLUtil;
@@ -110,6 +111,9 @@
// Must be done after load
queue.setMaxSize(destination.getMaxSize());
+ int delCounterLevel = DeliveringCounter.toLevel(getDeliveringCounterLevel());
+ queue.setDeliveringCounterLevel(delCounterLevel);
+
queue.activate();
}
else
@@ -125,8 +129,9 @@
destination.getMaxSize(), null,
destination.getFullSize(), destination.getPageSize(),
destination.getDownCacheSize(), destination.isClustered(),
- serverPeer.getRecoverDeliveriesTimeout());
- po.addBinding(new Binding(queueCond, queue, false), false);
+ serverPeer.getRecoverDeliveriesTimeout(),
+ ((ManagedQueue)destination).getDeliveringCounterLevel());
+ po.addBinding(new Binding(queueCond, queue, false), false);
queue.activate();
}
@@ -294,6 +299,29 @@
{
return ((ManagedQueue)destination).getConsumersCount();
}
+
+ public String getDeliveringCounterLevel() throws Exception
+ {
+ int level = ((ManagedQueue)destination).getDeliveringCounterLevel();
+ return DeliveringCounter.toString(level);
+ }
+
+ public void setDeliveringCounterLevel(String level) throws Exception
+ {
+ try
+ {
+ if (started)
+ {
+ log.warn("DeliveringCounterLevel can only be set before the queue is started.");
+ return;
+ }
+ ((ManagedQueue)destination).setDeliveringCounterLevel(DeliveringCounter.toLevel(level));
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMXInvocation(t, this + " getDeliveringCounterLevel");
+ }
+ }
// JMX managed operations -----------------------------------------------------------------------
@@ -333,6 +361,24 @@
}
}
+ public List listInProcessMessages() throws Exception
+ {
+ try
+ {
+ if (!started)
+ {
+ log.warn("Queue is stopped.");
+ return null;
+ }
+
+ return ((ManagedQueue)destination).listInProcessMessages();
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMXInvocation(t, this + " listInProcessMessages");
+ }
+ }
+
public List listAllMessages(String selector) throws Exception
{
try
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -98,4 +98,8 @@
void setClustered(boolean isClustered);
void staticMerge(Queue queue) throws Exception;
+
+ List listInProcessMessages();
+
+ void setDeliveringCounterLevel(int delCounterLevel);
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -97,7 +97,7 @@
//Having to keep this count requires synchronization between delivery thread and acknowledgement
//thread which will hamper concurrency
//Suggest that we have a flag that disables this for production systems
- protected SynchronizedInt deliveringCount;
+ protected DeliveringCounter deliveringCount;
protected Set scheduledDeliveries;
@@ -110,10 +110,15 @@
protected OrderingGroupMonitor monitor = new OrderingGroupMonitor();
// Constructors ---------------------------------------------------------------------------------
-
protected ChannelSupport(long channelID, PersistenceManager pm,
boolean recoverable, int maxSize)
{
+ this(channelID, pm, recoverable, maxSize, DeliveringCounter.LEVEL_COUNTER);
+ }
+
+ protected ChannelSupport(long channelID, PersistenceManager pm,
+ boolean recoverable, int maxSize, int delCounterLevel)
+ {
if (trace) { log.trace("creating " + (pm != null ? "recoverable " : "non-recoverable ") + "channel[" + channelID + "]"); }
this.pm = pm;
@@ -126,7 +131,7 @@
lock = new Object();
- deliveringCount = new SynchronizedInt(0);
+ deliveringCount = new DeliveringCounter(delCounterLevel);
scheduledDeliveries = new HashSet();
@@ -315,7 +320,7 @@
if (!del.isRecovered())
{
- deliveringCount.decrement();
+ deliveringCount.decrement(ref.getMessage());
}
if (!checkAndSchedule(ref))
@@ -335,6 +340,11 @@
{
return recoverable;
}
+
+ public List listInProcessMessages()
+ {
+ return deliveringCount.getMessages();
+ }
public List browse(Filter filter)
{
@@ -358,6 +368,7 @@
MessageReference ref = (MessageReference) i.next();
messages.add(ref.getMessage());
}
+
return messages;
}
}
@@ -433,7 +444,7 @@
}
clearAllScheduledDeliveries(true);
- deliveringCount.set(0);
+ deliveringCount.reset();
log.trace(this + " done removing all references, there are " + this.messageRefs.size());
}
@@ -535,6 +546,11 @@
}
}
}
+
+ public void setDeliveringCounterLevel(int lvl)
+ {
+ deliveringCount.setLevel(lvl);
+ }
public int getMessagesAdded()
{
@@ -730,7 +746,7 @@
}
}
- deliveringCount.increment();
+ deliveringCount.increment(ref.getMessage());
}
}
}
@@ -777,7 +793,7 @@
// Receiver accepted the reference
- deliveringCount.increment();
+ deliveringCount.increment(ref.getMessage());
return true;
}
@@ -827,7 +843,7 @@
if (!d.isRecovered())
{
- deliveringCount.decrement();
+ deliveringCount.decrement(d.getReference().getMessage());
}
MessageReference ref = d.getReference();
@@ -1047,7 +1063,7 @@
if (!del.isRecovered())
{
- deliveringCount.decrement();
+ deliveringCount.decrement(del.getReference().getMessage());
}
MessageReference ref = del.getReference();
@@ -1121,7 +1137,7 @@
{
throw new TransactionException("Failed to add reference", t);
}
- deliveringCount.decrement();
+ deliveringCount.decrement(ref.getMessage());
}
}
}
Added: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java (rev 0)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/DeliveringCounter.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -0,0 +1,208 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2010, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Message;
+
+/**
+ * A DeliveringCounter: keeps track of delivering count and messages.
+ *
+ * Because of the asynchronous nature of message delivery and acknowledge, message acknowledgments may
+ * come in before corresponding counting increment incurred with message delivery. For example,
+ * Message m1 is being delivered to a consumer and then is put to this counter. Normally the ack of m1
+ * comes later and causes the counter to decrement. m1 will be correctly added and removed.
+ * However, there is a minor chance that the ack of m1 comes in before it is put into this counter.
+ * Therefore when decrement() is called m1 is not there. Later m1 is put into the counter and will never
+ * be cleared because it has missed the ack. This would be fine with a pure delivering count (an integer)
+ * but not when retrieve messages being delivered. In above case, an acked message m1 will be returned
+ * when being asked for even if it has be acked.
+ *
+ * Also, adding in-processing message retrieval functionality entails more data structure operations for
+ * message references tracking, thus bringing extra performance cost. This shouldn't be necessary for users
+ * who doesn't need this functionality.
+ *
+ * So in the implementation, we use three variables:
+ *
+ * - an AtomicInteger to purely track the delivering count;
+ * - an ConcurrentHashMap to hold messages being delivered;
+ * - an ConcurrentLinkedQueue to hold messages that are being acknowledged but not yet put in the above map.
+ *
+ * In addition, a int flag to control the functionality level:
+ *
+ * - LEVEL_NONE denotes totally disable delivering counter.
+ * - LEVEL_COUNTER denotes delivering count only. This is the default level.
+ * - LEVEL_ALL denotes full delivering counter functionality.
+ *
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ * Created Aug 18, 2010 1:27:46 PM
+ */
+public class DeliveringCounter
+{
+ public static final int LEVEL_NONE = 0;
+ public static final int LEVEL_COUNTER = 1;
+ public static final int LEVEL_ALL = 255;
+
+ private static final Logger log = Logger.getLogger(DeliveringCounter.class);
+
+ private int level = LEVEL_COUNTER;
+
+ private ConcurrentHashMap<Long, Message> msgCount;
+ private ConcurrentLinkedQueue<Message> earlyMsgs;
+ private AtomicInteger counter;
+
+ public DeliveringCounter()
+ {
+ this(LEVEL_COUNTER);
+ }
+
+ public DeliveringCounter(int lv)
+ {
+ initLevel(lv);
+ }
+
+ private void initLevel(int lv)
+ {
+ level = lv;
+ log.debug("Delivering Counter enabled: " + level);
+ if (level == LEVEL_NONE) return;
+ counter = new AtomicInteger(0);
+ if (level == LEVEL_ALL)
+ {
+ msgCount = new ConcurrentHashMap<Long, Message>();
+ earlyMsgs = new ConcurrentLinkedQueue<Message>();
+ }
+ }
+
+ public void decrement(Message acked)
+ {
+ if (level == LEVEL_NONE) return;
+ counter.decrementAndGet();
+ if (level != LEVEL_ALL) return;
+ Message m = msgCount.remove(acked.getMessageID());
+ if (m == null)
+ {
+ earlyMsgs.add(acked);
+ }
+ }
+
+ public void increment(Message msg)
+ {
+ if (level == LEVEL_NONE) return;
+ counter.incrementAndGet();
+ if (level != LEVEL_ALL) return;
+ msgCount.put(msg.getMessageID(), msg);
+ }
+
+ public int get()
+ {
+ if (level == LEVEL_NONE) return 0;
+ return counter.get();
+ }
+
+ public void reset()
+ {
+ if (level == LEVEL_NONE) return;
+ counter.set(0);
+ if (level != LEVEL_ALL) return;
+ earlyMsgs.clear();
+ msgCount.clear();
+ }
+
+ public synchronized List getMessages()
+ {
+ if (level != LEVEL_ALL) return new ArrayList<Message>();
+ Iterator<Message> iterId = earlyMsgs.iterator();
+ while (iterId.hasNext())
+ {
+ Message acked = iterId.next();
+ Message m = msgCount.remove(acked.getMessageID());
+ if (m != null)
+ {
+ iterId.remove();
+ }
+ }
+ Iterator<Message> iter = msgCount.values().iterator();
+ List<Message> list = new ArrayList<Message>();
+ while(iter.hasNext()) {
+ list.add(iter.next());
+ }
+ return list;
+ }
+
+ //for tests only
+ public void setLevel(int lv)
+ {
+ initLevel(lv);
+ }
+
+ //for tests only
+ public int getLevel()
+ {
+ return level;
+ }
+
+ public static String toString(int lvl) throws Exception
+ {
+ if (lvl == LEVEL_NONE)
+ {
+ return "NONE";
+ }
+ if (lvl == LEVEL_COUNTER)
+ {
+ return "COUNTER";
+ }
+ if (lvl == LEVEL_ALL)
+ {
+ return "ALL";
+ }
+ throw new Exception("Invalid value: " + lvl);
+ }
+
+ public static int toLevel(String lvl) throws Exception
+ {
+ if ("NONE".equals(lvl))
+ {
+ return LEVEL_NONE;
+ }
+ if ("COUNTER".equals(lvl))
+ {
+ return LEVEL_COUNTER;
+ }
+ if ("ALL".equals(lvl))
+ {
+ return LEVEL_ALL;
+ }
+ throw new Exception("Invalid value: \"" + lvl + "\"");
+ }
+}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -142,7 +142,17 @@
setup(nodeID, name, filter, clustered, DEFAULT_RECOVER_DELIVERIES_TIMEOUT);
}
-
+
+ public MessagingQueue(int nodeID, String name, long id, MessageStore ms, PersistenceManager pm,
+ boolean recoverable, int maxSize, Filter filter,
+ int fullSize, int pageSize, int downCacheSize, boolean clustered,
+ long recoverDeliveriesTimeout, int deliveringCounterLevel)
+ {
+ this(nodeID, name, id, ms, pm, recoverable, maxSize, filter, fullSize, pageSize, downCacheSize, clustered,
+ recoverDeliveriesTimeout);
+ this.deliveringCount.setLevel(deliveringCounterLevel);
+ }
+
private void setup(int nodeID, String name, Filter filter, boolean clustered, long recoverDeliveriesTimeout)
{
this.nodeID = nodeID;
@@ -262,7 +272,7 @@
recoveryMap.put(new Long(message.getMessageID()), re);
- deliveringCount.increment();
+ deliveringCount.increment(ref.getMessage());
iter.remove();
@@ -357,7 +367,7 @@
messageRefs.addFirst(ref, ref.getMessage().getPriority());
//Need to decrement the delivery count too
- deliveringCount.decrement();
+ deliveringCount.decrement(ref.getMessage());
}
if (trace) { log.trace("Found one, added back on queue"); }
@@ -699,7 +709,7 @@
{
messageRefs.addFirst(ref, ref.getMessage().getPriority());
- deliveringCount.decrement();
+ deliveringCount.decrement(ref.getMessage());
}
added = true;
@@ -720,4 +730,10 @@
{
clustered = isClustered;
}
+
+ //only used by tests
+ public DeliveringCounter getDeliveringCounter()
+ {
+ return deliveringCount;
+ }
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/MessagingQueueTestBase.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -23,8 +23,13 @@
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.Message;
@@ -32,6 +37,7 @@
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.contract.Receiver;
+import org.jboss.messaging.core.impl.DeliveringCounter;
import org.jboss.messaging.core.impl.JDBCPersistenceManager;
import org.jboss.messaging.core.impl.MessagingQueue;
import org.jboss.messaging.core.impl.message.SimpleMessageStore;
@@ -1658,6 +1664,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1672,6 +1679,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -1735,6 +1745,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1749,6 +1760,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -1762,12 +1776,19 @@
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
tx.commit();
assertTrue(queue.browse(null).isEmpty());
deliveringCount = queue.getDeliveringCount();
assertEquals(0, deliveringCount);
+
+ listMessages = queue.listInProcessMessages();
+ assertEquals(0, listMessages.size());
+
}
@@ -1782,6 +1803,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1796,6 +1818,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -1809,11 +1834,17 @@
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
tx.rollback();
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
// acknowledge non-transactionally
r.acknowledge(ackm, null);
@@ -1984,6 +2015,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -1998,6 +2030,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List received = r.getMessages();
assertEquals(1, received.size());
Message rm = (Message)received.iterator().next();
@@ -2033,6 +2068,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2047,6 +2083,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List received = r.getMessages();
assertEquals(1, received.size());
Message rm = (Message)received.iterator().next();
@@ -2060,6 +2099,9 @@
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
tx.commit();
assertTrue(queue.browse(null).isEmpty());
@@ -2185,6 +2227,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2210,6 +2253,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -2290,6 +2336,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2316,6 +2363,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List received = r.getMessages();
assertEquals(1, received.size());
Message rm = (Message)received.iterator().next();
@@ -2342,6 +2392,8 @@
// we test only non-recoverable channels now
return;
}
+
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2372,6 +2424,11 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(NUMBER_OF_MESSAGES, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
+ log.info("list: " + listMessages);
+
assertEqualSets(refs, r.getMessages());
}
@@ -2385,6 +2442,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -2416,6 +2474,10 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(NUMBER_OF_MESSAGES, deliveringCount);
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
}
@@ -2691,6 +2753,8 @@
List stored = queue.browse(null);
assertEquals(1, stored.size());
+
+ log.info("----------------browsed: " + stored);
Message sm = (Message)stored.iterator().next();
assertFalse(sm.isReliable());
@@ -3870,6 +3934,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -3884,6 +3949,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -3947,6 +4015,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -3961,6 +4030,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -3974,6 +4046,9 @@
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
tx.commit();
assertTrue(queue.browse(null).isEmpty());
@@ -3991,6 +4066,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4005,6 +4081,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -4018,11 +4097,17 @@
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
tx.rollback();
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
// acknowledge non-transactionally
r.acknowledge(ackm, null);
@@ -4045,6 +4130,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4062,6 +4148,10 @@
}
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4086,6 +4176,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4103,6 +4194,10 @@
}
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
Transaction tx = tr.createTransaction();
@@ -4116,6 +4211,9 @@
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+ listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
tx.commit();
assertTrue(queue.browse(null).isEmpty());
@@ -4133,6 +4231,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4150,6 +4249,10 @@
}
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
Transaction tx = tr.createTransaction();
@@ -4163,10 +4266,16 @@
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+ listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
tx.rollback();
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+ listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
// acknowledge non-transactionally
for(Iterator i = r.getMessages().iterator(); i.hasNext();)
{
@@ -4197,8 +4306,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4212,6 +4321,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -4328,8 +4440,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4343,6 +4455,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -4356,6 +4471,9 @@
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
tx.commit();
assertTrue(queue.browse(null).isEmpty());
@@ -4373,8 +4491,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4388,6 +4506,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -4401,11 +4522,17 @@
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
tx.rollback();
deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
// acknowledge non-transactionally
r.acknowledge(ackm, null);
@@ -4428,8 +4555,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4447,6 +4574,9 @@
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4470,8 +4600,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4488,6 +4618,10 @@
}
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
Transaction tx = tr.createTransaction();
@@ -4501,6 +4635,9 @@
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+ listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
tx.commit();
assertTrue(queue.browse(null).isEmpty());
@@ -4517,8 +4654,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4535,6 +4672,10 @@
}
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
Transaction tx = tr.createTransaction();
@@ -4548,10 +4689,16 @@
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+ listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
tx.rollback();
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+ listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
// acknowledge non-transactionally
for(Iterator i = r.getMessages().iterator(); i.hasNext();)
{
@@ -4583,8 +4730,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4609,6 +4756,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -4631,8 +4781,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4660,6 +4810,10 @@
tx.commit();
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4687,8 +4841,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4714,6 +4868,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List acknowledging = r.getMessages();
assertEquals(1, acknowledging.size());
Message ackm = (Message)acknowledging.get(0);
@@ -4736,6 +4893,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
@@ -4765,6 +4923,10 @@
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -4787,8 +4949,8 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
-
// add an NACKING receiver to the channel
SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(queue.getLocalDistributor().add(r));
@@ -4818,6 +4980,10 @@
tx.commit();
assertEquals(NUMBER_OF_MESSAGES, queue.getDeliveringCount());
+
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(NUMBER_OF_MESSAGES, listMessages.size());
+
assertEqualSets(refs, r.getMessages());
for(Iterator i = r.getMessages().iterator(); i.hasNext();)
@@ -5168,6 +5334,7 @@
// we test only non-recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// the channel has no receivers
assertFalse(queue.getLocalDistributor().iterator().hasNext());
@@ -5193,6 +5360,9 @@
int deliveringCount = queue.getDeliveringCount();
assertEquals(1, deliveringCount);
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List messages = receiver.getMessages();
assertEquals(1, messages.size());
Message sm = (Message)messages.iterator().next();
@@ -5302,6 +5472,7 @@
// we test only recoverable channels now
return;
}
+ queue.getDeliveringCounter().setLevel(DeliveringCounter.LEVEL_ALL);
// the channel has no receivers
assertFalse(queue.getLocalDistributor().iterator().hasNext());
@@ -5326,6 +5497,9 @@
assertEquals(1, queue.getDeliveringCount());
+ List listMessages = queue.listInProcessMessages();
+ assertEquals(1, listMessages.size());
+
List messages = receiver.getMessages();
assertEquals(1, messages.size());
Message sm = (Message)messages.iterator().next();
@@ -5367,7 +5541,176 @@
assertFalse(queue.getLocalDistributor().remove(new SimpleReceiver("INEXISTENT")));
}
+ public void testDeliveringCounterStatics() throws Exception
+ {
+ try
+ {
+ DeliveringCounter.toString(-1);
+ fail("-1 is not a valid level. It should reject it and throw an exception.");
+ }
+ catch (Exception e)
+ {
+ //ok
+ }
+
+ String levelStr = DeliveringCounter.toString(DeliveringCounter.LEVEL_ALL);
+ assertEquals("ALL", levelStr);
+
+ levelStr = DeliveringCounter.toString(DeliveringCounter.LEVEL_COUNTER);
+ assertEquals("COUNTER", levelStr);
+
+ levelStr = DeliveringCounter.toString(DeliveringCounter.LEVEL_NONE);
+ assertEquals("NONE", levelStr);
+ try
+ {
+ int l = DeliveringCounter.toLevel("AABB");
+ fail("AABB is not a valid value, but it returns: " + l);
+ }
+ catch (Exception e)
+ {
+ //ok
+ }
+
+ int lv = DeliveringCounter.toLevel("ALL");
+ assertEquals(DeliveringCounter.LEVEL_ALL, lv);
+
+ lv = DeliveringCounter.toLevel("NONE");
+ assertEquals(DeliveringCounter.LEVEL_NONE, lv);
+
+ lv = DeliveringCounter.toLevel("COUNTER");
+ assertEquals(DeliveringCounter.LEVEL_COUNTER, lv);
+
+ }
+
+ public void testDeliveringCounterNone() throws Exception
+ {
+ DeliveringCounter counter = new DeliveringCounter(DeliveringCounter.LEVEL_NONE);
+
+ assertEquals(DeliveringCounter.LEVEL_NONE, counter.getLevel());
+ Message[] mids = new Message[1000];
+ for (int i = 0; i < 1000; i++)
+ {
+ Message m = CoreMessageFactory.createCoreMessage(i);
+ counter.increment(m);
+ mids[i] = m;
+ }
+ assertEquals(0, counter.get());
+ for (int i = 0; i < 49; i++) {
+ counter.decrement(mids[i]);
+ }
+ assertEquals(0, counter.get());
+ counter.reset();
+ assertEquals(0, counter.get());
+ }
+
+ public void testDeliveringCounterDefault() throws Exception
+ {
+ DeliveringCounter counter = new DeliveringCounter();
+
+ assertEquals(DeliveringCounter.LEVEL_COUNTER, counter.getLevel());
+ Message[] mids = new Message[1000];
+ for (int i = 0; i < 1000; i++)
+ {
+ Message m = CoreMessageFactory.createCoreMessage(i);
+ counter.increment(m);
+ mids[i] = m;
+ }
+ assertEquals(1000, counter.get());
+ for (int i = 0; i < 49; i++) {
+ counter.decrement(mids[i]);
+ }
+ assertEquals(951, counter.get());
+ counter.reset();
+ assertEquals(0, counter.get());
+ }
+
+ public void testDeliveringCounterAll() throws Exception
+ {
+ DeliveringCounter counter = new DeliveringCounter(DeliveringCounter.LEVEL_ALL);
+
+ assertEquals(DeliveringCounter.LEVEL_ALL, counter.getLevel());
+ Message[] mids = new Message[1000];
+
+ Map<Long, Message> msgs = new HashMap<Long, Message>();
+
+ for (int i = 0; i < 1000; i++)
+ {
+ Message m = CoreMessageFactory.createCoreMessage(i);
+ msgs.put(m.getMessageID(), m);
+ counter.increment(m);
+ mids[i] = m;
+ }
+ assertEquals(1000, counter.get());
+ for (int i = 0; i < 49; i++) {
+ counter.decrement(mids[i]);
+ msgs.remove(mids[i].getMessageID());
+ }
+ assertEquals(951, counter.get());
+ assertEquals(951, counter.getMessages().size());
+
+ List inProcMsgs = counter.getMessages();
+ for (Object o : inProcMsgs)
+ {
+ Message msg = (Message)o;
+ msgs.remove(msg.getMessageID());
+ }
+ assertEquals(0, msgs.size());
+
+ counter.reset();
+ assertEquals(0, counter.get());
+ assertEquals(0, counter.getMessages().size());
+ }
+
+ /*
+ * create 10 threads, 5 threads put messages to the counter
+ * the other 5 remove messages from the counter.
+ * total number of messages 50000. Messages are put into
+ * maps for each group of threads.
+ *
+ * When those threads finish, the counter should be empty.
+ *
+ */
+ public void testDeliveringCounterConcurrent() throws Exception
+ {
+ DeliveringCounter counter = new DeliveringCounter(DeliveringCounter.LEVEL_ALL);
+
+ assertEquals(DeliveringCounter.LEVEL_ALL, counter.getLevel());
+ final int numMsg = 50000;
+ Message[] mids = new Message[numMsg];
+
+ final ConcurrentLinkedQueue<Message> msgs1 = new ConcurrentLinkedQueue<Message>();
+ final ConcurrentLinkedQueue<Message> msgs2 = new ConcurrentLinkedQueue<Message>();
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ Message m = CoreMessageFactory.createCoreMessage(i);
+ msgs1.add(m);
+ msgs2.add(m);
+ }
+
+ final int numThr = 5;
+ //create putter threads
+ Thread[] putters = new PutterThread[numThr];
+ Thread[] takers = new TakerThread[numThr];
+
+ for (int i = 0; i < numThr; i++) {
+ putters[i] = new PutterThread(counter, msgs1, i);
+ takers[i] = new TakerThread(counter, msgs2, i);
+ putters[i].start();
+ takers[i].start();
+ }
+
+ for (int i = 0; i < numThr; i++) {
+ putters[i].join();
+ takers[i].join();
+ }
+
+ List msgs = counter.getMessages();
+ assertEquals(0, msgs.size());
+ assertEquals(0, counter.get());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -5389,5 +5732,59 @@
}
// Inner classes -------------------------------------------------
+ static class PutterThread extends Thread
+ {
+ protected ConcurrentLinkedQueue<Message> messages;
+ protected DeliveringCounter counter;
+ protected int index;
+
+ public PutterThread(DeliveringCounter c, ConcurrentLinkedQueue<Message> m, int i)
+ {
+ counter = c;
+ messages = m;
+ index = i%10;
+ }
+
+ public void run()
+ {
+ int num = 0;
+ Message m = messages.poll();
+ //give some delay
+ while (m != null)
+ {
+ num++;
+ if (num % 10 == index)
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ doWork(m);
+ m = messages.poll();
+ }
+ }
+
+ protected void doWork(Message m)
+ {
+ counter.increment(m);
+ }
+ }
+ static class TakerThread extends PutterThread
+ {
+ public TakerThread(DeliveringCounter c, ConcurrentLinkedQueue<Message> m, int i)
+ {
+ super(c, m, i);
+ }
+
+ protected void doWork(Message m)
+ {
+ counter.decrement(m);
+ }
+
+ }
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -482,9 +482,7 @@
}
assertTrue(fast.processed == numMessages - 2);
-
- // Thread.sleep(10000);
-
+ assertTrue(slow.processed == 2);
}
finally
{
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -400,6 +400,15 @@
public void staticMerge(org.jboss.messaging.core.contract.Queue queue) throws Exception
{
}
+
+ public List listInProcessMessages()
+ {
+ return null;
+ }
+
+ public void setDeliveringCounterLevel(int delCounterLevel)
+ {
+ }
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/server/destination/QueueManagementTest.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -929,7 +929,88 @@
deploy(config);
undeployDestination("example/tests/myqueue/ver1");
}
+
+ public void testQueueDeliveringCounterLevelAttr() throws Exception
+ {
+ //Default
+ String config =
+ "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+ " name=\"somedomain:service=Queue,name=DCLQueue\"" +
+ " xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+ " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+ "</mbean>";
+ ObjectName destObjectName = deploy(config);
+
+ assertEquals("COUNTER", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+ undeployDestination("DCLQueue");
+
+ //None
+ config =
+ "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+ " name=\"somedomain:service=Queue,name=DCLQueue\"" +
+ " xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+ " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+ " <attribute name=\"DeliveringCounterLevel\">NONE</attribute>" +
+ "</mbean>";
+
+ destObjectName = deploy(config);
+
+ assertEquals("NONE", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+ undeployDestination("DCLQueue");
+
+ //All
+ config =
+ "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+ " name=\"somedomain:service=Queue,name=DCLQueue\"" +
+ " xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+ " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+ " <attribute name=\"DeliveringCounterLevel\">ALL</attribute>" +
+ "</mbean>";
+
+ destObjectName = deploy(config);
+
+ assertEquals("ALL", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+ undeployDestination("DCLQueue");
+
+ //Counter
+ config =
+ "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+ " name=\"somedomain:service=Queue,name=DCLQueue\"" +
+ " xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+ " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+ " <attribute name=\"DeliveringCounterLevel\">COUNTER</attribute>" +
+ "</mbean>";
+
+ destObjectName = deploy(config);
+
+ assertEquals("COUNTER", ServerManagement.getAttribute(destObjectName, "DeliveringCounterLevel"));
+ undeployDestination("DCLQueue");
+
+ //Error
+ config =
+ "<mbean code=\"org.jboss.jms.server.destination.QueueService\" " +
+ " name=\"somedomain:service=Queue,name=DCLQueue\"" +
+ " xmbean-dd=\"xmdesc/Queue-xmbean.xml\">" +
+ " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+ " <attribute name=\"DeliveringCounterLevel\">XXXX</attribute>" +
+ "</mbean>";
+
+ try
+ {
+ destObjectName = deploy(config);
+ fail("XXXX is not a valid value and the queue should have rejected it.");
+ }
+ catch(Exception e)
+ {
+ //ok
+ }
+ finally
+ {
+ undeployDestination("DCLQueue");
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java 2010-08-24 14:52:19 UTC (rev 8081)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java 2010-08-24 15:27:41 UTC (rev 8082)
@@ -22,7 +22,9 @@
package org.jboss.test.messaging.util;
import java.io.Serializable;
+import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import org.jboss.messaging.core.impl.message.CoreMessage;
@@ -59,6 +61,12 @@
Map coreHeaders,
Serializable payload)
{
+ if (coreHeaders == null) {
+ coreHeaders = new HashMap();
+ }
+ if (coreHeaders.get("JBM_MESSAGE_ID") == null) {
+ coreHeaders.put("JBM_MESSAGE_ID", "ID:JBM-" + UUID.randomUUID().toString());
+ }
CoreMessage cm =
new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, null);
cm.setPayload(payload);
More information about the jboss-cvs-commits
mailing list