[jboss-cvs] JBoss Messaging SVN: r5240 - in branches/Branch_JBMESSAGING_1416: tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Nov 3 04:39:49 EST 2008


Author: gaohoward
Date: 2008-11-03 04:39:48 -0500 (Mon, 03 Nov 2008)
New Revision: 5240

Modified:
   branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
Log:
JBMESSAGING-1416


Modified: branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-11-03 08:24:55 UTC (rev 5239)
+++ branches/Branch_JBMESSAGING_1416/src/main/org/jboss/messaging/core/impl/OrderingGroup.java	2008-11-03 09:39:48 UTC (rev 5240)
@@ -308,7 +308,6 @@
     */
    public long unmarkSending()
    {
-System.err.println("* * * unmarking ref : " + ref);
       if (pendingSentCount > 0)
       {
          pendingSentCount--;

Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java	2008-11-03 08:24:55 UTC (rev 5239)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupMiscTest.java	2008-11-03 09:39:48 UTC (rev 5240)
@@ -24,9 +24,12 @@
 package org.jboss.test.messaging.jms;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -53,10 +56,9 @@
 {
 
    // Constants -----------------------------------------------------
-   private final int NUM_MESSAGES = 100;
 
    // Attributes ----------------------------------------------------
-   private ArrayList<TextMessage> recvMsgs = new ArrayList<TextMessage>();
+   private HashMap<String, ArrayList<TextMessage>> recvBuffer = new HashMap<String, ArrayList<TextMessage>>();
 
    // Static --------------------------------------------------------
 
@@ -67,11 +69,6 @@
    }
 
    // Public --------------------------------------------------------
-   public void tearDown() throws Exception
-   {     
-      super.tearDown();
-      recvMsgs.clear();
-   }
 
    /*
     * Sending 5 messages and letting the 3rd and 5th messages go to dlq and 
@@ -317,10 +314,10 @@
 
 
    /*
-    * Send 10 ordering messages with default priority and 
-    * then disable ordering, then send 1 message with high
-    * priority. Make sure the last message is received first
-    * and the other is received later but ordered.
+    * First send 2 normal messages, then send 10 ordering messages with some priority and 
+    * then disable ordering, then send 2 more normal messages with high
+    * priority. Make sure the normal messages are received first
+    * and the ordered messages are received later but ordered.
     */
    public void testOrderingGroupOnOff() throws Exception
    {
@@ -395,72 +392,66 @@
    }
    
    /*
-    * create 100 ordering groups, each sending 100 messages
+    * create 10 ordering groups, each sending 100 messages
     * make sure the order of each group is guaranteed.
     */
    public void testMultipleOrderingGroups() throws Exception
    {
+      
+      final int NUM_PRODUCERS = 10;
+      final int NUM_MSG = 100;
+      JBossMessageProducer[] prods = new JBossMessageProducer[NUM_PRODUCERS];
       Connection conn = null;
       
       try
       {     
          conn = cf.createConnection();
    
-         Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-         JBossMessageProducer producer = (JBossMessageProducer)producerSess.createProducer(queue1);
-         producer.enableOrderingGroup(null);
+         Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         for (int i = 0; i < NUM_PRODUCERS; i++)
+         {
+            prods[i] = (JBossMessageProducer)producerSess.createProducer(queue1);
+            prods[i].enableOrderingGroup(null);
+         }
    
-         Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         Session consumerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
          MessageConsumer consumer = consumerSess.createConsumer(queue1);
          conn.start();
-   
-         final int NUM_MSG = 100;
-         
+
          //Send some messages
-         for (int i = 0; i < 100; ++i)
+         for (int i = 0; i < NUM_PRODUCERS; i++)
          {
-            TextMessage tm = producerSess.createTextMessage("ordering" + i);
-            producer.send(tm);
+            String key = prepareReceivingBuffer(i);
+            for (int j = 0; j < NUM_MSG; j++)
+            {
+               TextMessage tm = producerSess.createTextMessage(key + ":" + j);
+               prods[i].send(tm, DeliveryMode.PERSISTENT, j%10, Message.DEFAULT_TIME_TO_LIVE);
+            }
          }
+   
+         assertEquals(NUM_PRODUCERS, recvBuffer.size());
          
-         assertRemainingMessages(NUM_MSG);
-   
          log.trace("Sent messages");
    
-         int count = 0;
          while (true)
          {
-            Message m = consumer.receive(400);
-            if (m == null) break;
-            count++;
+            TextMessage rm = (TextMessage)consumer.receive(1000);
+            if (rm == null) break;
+            putToBuffer(rm);
          }
          
-         assertRemainingMessages(NUM_MSG);
-   
-         log.trace("Received " + count +  " messages");
-
-         //if ordering group, count should be 1.
-         assertEquals(1, count);
-   
-         consumerSess.recover();
-         
-         assertRemainingMessages(NUM_MSG);
-   
-         log.trace("Session recover called");
-   
-         TextMessage m = null;
-   
-         int i = 0;
-         for (; i < 100; ++i)
+         for (int i = 0; i < NUM_PRODUCERS; ++i)
          {
-            m = (TextMessage)consumer.receive();
-            log.trace("Received message " + i);
-            m.acknowledge();
-            assertTrue(m.getText().equals("ordering" + i));
+            String key = "ordering-" + i;
+            ArrayList<TextMessage> group = recvBuffer.get(key);
+            assertEquals(NUM_MSG, group.size());
+            for (int j = 0; j < NUM_MSG; ++j)
+            {
+               TextMessage rm = group.get(j);
+               assertEquals(key + ":" + j, rm.getText());
+            }
          }
-         
-         assertRemainingMessages(0);
-   
+
          // make sure I don't receive anything else
    
          checkEmpty(queue1);
@@ -480,6 +471,35 @@
 
    // Private -------------------------------------------------------
 
+   /**
+    * @param rm
+    * @throws JMSException 
+    */
+   private void putToBuffer(TextMessage rm) throws JMSException
+   {
+      String text = rm.getText();
+      String[] tokens = text.split(":");
+      String key = tokens[0];
+      ArrayList<TextMessage> group = recvBuffer.get(key);
+      group.add(rm);
+   }
+
+   /**
+    * initialize a buffer for receiving ordering group messages.
+    * @param i
+    */
+   private String prepareReceivingBuffer(int i)
+   {
+      String key = "ordering-" + i;
+      ArrayList<TextMessage> grpBuffer = recvBuffer.get(key);
+      if (grpBuffer == null)
+      {
+         grpBuffer = new ArrayList<TextMessage>();
+         recvBuffer.put(key, grpBuffer);
+      }
+      return key;
+   }
+
    private static void doze(long tm)
    {
       try




More information about the jboss-cvs-commits mailing list