[jboss-cvs] JBoss Messaging SVN: r8111 - branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Oct 31 23:46:54 EDT 2010


Author: gaohoward
Date: 2010-10-31 23:46:53 -0400 (Sun, 31 Oct 2010)
New Revision: 8111

Modified:
   branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
Log:
JBMESSAGING-1822 test


Modified: branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2010-10-30 06:27:35 UTC (rev 8110)
+++ branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2010-11-01 03:46:53 UTC (rev 8111)
@@ -22,17 +22,23 @@
 package org.jboss.test.messaging.jms.clustering;
 
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.naming.InitialContext;
 
+import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.test.messaging.tools.ServerManagement;
 
 /**
@@ -382,6 +388,133 @@
       }
    }
 
+   //https://jira.jboss.org/browse/JBMESSAGING-1822
+   //send 1000 messages to node 0, receive on node 1 as such
+   // 1. when receiving, kill node1
+   // 2. restart node1 and continue receive
+   // 3. repeat step 1 and 2 for 5 times
+   // check all messages are received.
+   public void testSuckFailureHandling() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      
+      Map<String, TextMessage> msgs = new ConcurrentHashMap<String, TextMessage>();
+      JBossConnectionFactory cf0;
+
+      try
+      {
+         //we need to kill node 2, only leave two nodes.
+         //otherwise messages may be merged to node2 and never sucked again.
+         ServerManagement.kill(2);
+
+         cf0 = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+         conn0 = cf0.createConnection();
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+         final int NUM_MESSAGES = 1000;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("suckmsg-" + i);
+            prod0.send(tm);
+         }
+         
+         startReceive(msgs);
+         startReceive(msgs);
+         startReceive(msgs);
+         startReceive(msgs);
+         startReceive(msgs);
+         
+         JBossConnectionFactory cf1 = (JBossConnectionFactory)ic[1].lookup("/ConnectionFactory");
+         conn1 = cf1.createConnection();
+         Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+         conn1.start();
+         
+         TextMessage rm = (TextMessage)cons1.receive(5000);
+         
+         log.info("message num sucked: " + msgs.size());
+         while (rm != null)
+         {
+            rm.acknowledge();
+            log.info("message sucked: " + rm + " text=" + rm.getText());
+            msgs.put(rm.getText(), rm);
+            rm = (TextMessage)cons1.receive(5000);
+         }
+         
+         log.info("all received: " + msgs.size());
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            if (msgs.get("suckmsg-" + i) == null)
+            {
+               log.error("==== missing: " + "suckmsg-" + i);
+            }
+         }
+
+         assertEquals(NUM_MESSAGES, msgs.size());
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
+
+   private void startReceive(final Map<String, TextMessage> msgs) throws Exception
+   {
+      JBossConnectionFactory cf1 = (JBossConnectionFactory)ic[1].lookup("/ConnectionFactory");
+      Connection conn1 = cf1.createConnection();
+      Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+      cons1.setMessageListener(new MessageListener() {
+         public void onMessage(Message m)
+         {
+            try
+            {
+               log.error("receiving " + m);
+               TextMessage tm = (TextMessage)m;
+               log.info("message sucked: " + tm + " text: " + tm);
+               msgs.put(tm.getText(), tm);
+               m.acknowledge();
+            }
+            catch (JMSException e)
+            {
+               log.error("failed to ack " + m, e);
+            }
+         }
+      });
+      conn1.start();
+
+      try
+      {
+         Thread.sleep(800);
+      }
+      catch (InterruptedException e)
+      {
+         //ignore
+      }
+      
+      //kill server 1
+      ServerManagement.kill(1);
+
+      ServerManagement.start(1, "all+http", false);
+      ServerManagement.deployQueue("testDistributedQueue", 1);
+      ic[1] = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+      queue[1] = (Queue)ic[1].lookup("queue/testDistributedQueue");
+      
+      conn1.close();
+   }
+
    // Package private ---------------------------------------------
    
    // protected ----------------------------------------------------



More information about the jboss-cvs-commits mailing list