[jboss-cvs] JBoss Messaging SVN: r8111 - branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Oct 31 23:46:54 EDT 2010
Author: gaohoward
Date: 2010-10-31 23:46:53 -0400 (Sun, 31 Oct 2010)
New Revision: 8111
Modified:
branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
Log:
JBMESSAGING-1822 test
Modified: branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2010-10-30 06:27:35 UTC (rev 8110)
+++ branches/JBMESSAGING-1822/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2010-11-01 03:46:53 UTC (rev 8111)
@@ -22,17 +22,23 @@
package org.jboss.test.messaging.jms.clustering;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.test.messaging.tools.ServerManagement;
/**
@@ -382,6 +388,133 @@
}
}
+ //https://jira.jboss.org/browse/JBMESSAGING-1822
+ //send 1000 messages to node 0, receive on node 1 as such
+ // 1. when receiving, kill node1
+ // 2. restart node1 and continue receive
+ // 3. repeat step 1 and 2 for 5 times
+ // check all messages are received.
+ public void testSuckFailureHandling() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+
+ Map<String, TextMessage> msgs = new ConcurrentHashMap<String, TextMessage>();
+ JBossConnectionFactory cf0;
+
+ try
+ {
+ //we need to kill node 2, only leave two nodes.
+ //otherwise messages may be merged to node2 and never sucked again.
+ ServerManagement.kill(2);
+
+ cf0 = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+ conn0 = cf0.createConnection();
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ final int NUM_MESSAGES = 1000;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("suckmsg-" + i);
+ prod0.send(tm);
+ }
+
+ startReceive(msgs);
+ startReceive(msgs);
+ startReceive(msgs);
+ startReceive(msgs);
+ startReceive(msgs);
+
+ JBossConnectionFactory cf1 = (JBossConnectionFactory)ic[1].lookup("/ConnectionFactory");
+ conn1 = cf1.createConnection();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ conn1.start();
+
+ TextMessage rm = (TextMessage)cons1.receive(5000);
+
+ log.info("message num sucked: " + msgs.size());
+ while (rm != null)
+ {
+ rm.acknowledge();
+ log.info("message sucked: " + rm + " text=" + rm.getText());
+ msgs.put(rm.getText(), rm);
+ rm = (TextMessage)cons1.receive(5000);
+ }
+
+ log.info("all received: " + msgs.size());
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ if (msgs.get("suckmsg-" + i) == null)
+ {
+ log.error("==== missing: " + "suckmsg-" + i);
+ }
+ }
+
+ assertEquals(NUM_MESSAGES, msgs.size());
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ private void startReceive(final Map<String, TextMessage> msgs) throws Exception
+ {
+ JBossConnectionFactory cf1 = (JBossConnectionFactory)ic[1].lookup("/ConnectionFactory");
+ Connection conn1 = cf1.createConnection();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ cons1.setMessageListener(new MessageListener() {
+ public void onMessage(Message m)
+ {
+ try
+ {
+ log.error("receiving " + m);
+ TextMessage tm = (TextMessage)m;
+ log.info("message sucked: " + tm + " text: " + tm);
+ msgs.put(tm.getText(), tm);
+ m.acknowledge();
+ }
+ catch (JMSException e)
+ {
+ log.error("failed to ack " + m, e);
+ }
+ }
+ });
+ conn1.start();
+
+ try
+ {
+ Thread.sleep(800);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+
+ //kill server 1
+ ServerManagement.kill(1);
+
+ ServerManagement.start(1, "all+http", false);
+ ServerManagement.deployQueue("testDistributedQueue", 1);
+ ic[1] = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+ queue[1] = (Queue)ic[1].lookup("queue/testDistributedQueue");
+
+ conn1.close();
+ }
+
// Package private ---------------------------------------------
// protected ----------------------------------------------------
More information about the jboss-cvs-commits
mailing list