[jboss-cvs] JBoss Messaging SVN: r5240 - in branches/Branch_JBMESSAGING_1416: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 3 04:39:49 EST 2008
Author: gaohoward
Date: 2008-11-03 04:39:48 -0500 (Mon, 03 Nov 2008)
New Revision: 5240
Modified:
branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
Log:
JBMESSAGING-1416
Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2008-11-03 08:24:55 UTC (rev 5239)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2008-11-03 09:39:48 UTC (rev 5240)
@@ -308,7 +308,6 @@
*/
public long unmarkSending()
{
-System.err.println("* * * unmarking ref : " + ref);
if (pendingSentCount > 0)
{
pendingSentCount--;
Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java 2008-11-03 08:24:55 UTC (rev 5239)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java 2008-11-03 09:39:48 UTC (rev 5240)
@@ -24,9 +24,12 @@
package org.jboss.test.messaging.jms;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -53,10 +56,9 @@
{
// Constants -----------------------------------------------------
- private final int NUM_MESSAGES = 100;
// Attributes ----------------------------------------------------
- private ArrayList<TextMessage> recvMsgs = new ArrayList<TextMessage>();
+ private HashMap<String, ArrayList<TextMessage>> recvBuffer = new HashMap<String, ArrayList<TextMessage>>();
// Static --------------------------------------------------------
@@ -67,11 +69,6 @@
}
// Public --------------------------------------------------------
- public void tearDown() throws Exception
- {
- super.tearDown();
- recvMsgs.clear();
- }
/*
* Sending 5 messages and letting the 3rd and 5th messages go to dlq and
@@ -317,10 +314,10 @@
/*
- * Send 10 ordering messages with default priority and
- * then disable ordering, then send 1 message with high
- * priority. Make sure the last message is received first
- * and the other is received later but ordered.
+ * First send 2 normal messages, then send 10 ordering messages with some priority and
+ * then disable ordering, then send 2 more normal messages with high
+ * priority. Make sure the normal messages are received first
+ * and the ordered messages are received later but ordered.
*/
public void testOrderingGroupOnOff() throws Exception
{
@@ -395,72 +392,66 @@
}
/*
- * create 100 ordering groups, each sending 100 messages
+ * create 10 ordering groups, each sending 100 messages
* make sure the order of each group is guaranteed.
*/
public void testMultipleOrderingGroups() throws Exception
{
+
+ final int NUM_PRODUCERS = 10;
+ final int NUM_MSG = 100;
+ JBossMessageProducer[] prods = new JBossMessageProducer[NUM_PRODUCERS];
Connection conn = null;
try
{
conn = cf.createConnection();
- Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- JBossMessageProducer producer = (JBossMessageProducer)producerSess.createProducer(queue1);
- producer.enableOrderingGroup(null);
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ for (int i = 0; i < NUM_PRODUCERS; i++)
+ {
+ prods[i] = (JBossMessageProducer)producerSess.createProducer(queue1);
+ prods[i].enableOrderingGroup(null);
+ }
- Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Session consumerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSess.createConsumer(queue1);
conn.start();
-
- final int NUM_MSG = 100;
-
+
//Send some messages
- for (int i = 0; i < 100; ++i)
+ for (int i = 0; i < NUM_PRODUCERS; i++)
{
- TextMessage tm = producerSess.createTextMessage("ordering" + i);
- producer.send(tm);
+ String key = prepareReceivingBuffer(i);
+ for (int j = 0; j < NUM_MSG; j++)
+ {
+ TextMessage tm = producerSess.createTextMessage(key + ":" + j);
+ prods[i].send(tm, DeliveryMode.PERSISTENT, j%10, Message.DEFAULT_TIME_TO_LIVE);
+ }
}
+
+ assertEquals(NUM_PRODUCERS, recvBuffer.size());
- assertRemainingMessages(NUM_MSG);
-
log.trace("Sent messages");
- int count = 0;
while (true)
{
- Message m = consumer.receive(400);
- if (m == null) break;
- count++;
+ TextMessage rm = (TextMessage)consumer.receive(1000);
+ if (rm == null) break;
+ putToBuffer(rm);
}
- assertRemainingMessages(NUM_MSG);
-
- log.trace("Received " + count + " messages");
-
- //if ordering group, count should be 1.
- assertEquals(1, count);
-
- consumerSess.recover();
-
- assertRemainingMessages(NUM_MSG);
-
- log.trace("Session recover called");
-
- TextMessage m = null;
-
- int i = 0;
- for (; i < 100; ++i)
+ for (int i = 0; i < NUM_PRODUCERS; ++i)
{
- m = (TextMessage)consumer.receive();
- log.trace("Received message " + i);
- m.acknowledge();
- assertTrue(m.getText().equals("ordering" + i));
+ String key = "ordering-" + i;
+ ArrayList<TextMessage> group = recvBuffer.get(key);
+ assertEquals(NUM_MSG, group.size());
+ for (int j = 0; j < NUM_MSG; ++j)
+ {
+ TextMessage rm = group.get(j);
+ assertEquals(key + ":" + j, rm.getText());
+ }
}
-
- assertRemainingMessages(0);
-
+
// make sure I don't receive anything else
checkEmpty(queue1);
@@ -480,6 +471,35 @@
// Private -------------------------------------------------------
+ /**
+ * @param rm
+ * @throws JMSException
+ */
+ private void putToBuffer(TextMessage rm) throws JMSException
+ {
+ String text = rm.getText();
+ String[] tokens = text.split(":");
+ String key = tokens[0];
+ ArrayList<TextMessage> group = recvBuffer.get(key);
+ group.add(rm);
+ }
+
+ /**
+ * initialize a buffer for receiving ordering group messages.
+ * @param i
+ */
+ private String prepareReceivingBuffer(int i)
+ {
+ String key = "ordering-" + i;
+ ArrayList<TextMessage> grpBuffer = recvBuffer.get(key);
+ if (grpBuffer == null)
+ {
+ grpBuffer = new ArrayList<TextMessage>();
+ recvBuffer.put(key, grpBuffer);
+ }
+ return key;
+ }
+
private static void doze(long tm)
{
try
More information about the jboss-cvs-commits
mailing list