[jboss-cvs] JBoss Messaging SVN: r8615 - in branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging: tools and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 20 02:21:36 EDT 2013
Author: bershath27
Date: 2013-09-20 02:21:35 -0400 (Fri, 20 Sep 2013)
New Revision: 8615
Modified:
branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
JBPAPP-10865
Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2013-09-20 06:20:53 UTC (rev 8614)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2013-09-20 06:21:35 UTC (rev 8615)
@@ -17,6 +17,7 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
@@ -1185,6 +1186,187 @@
}
}
+ /**
+ * deploy a distributed queue on only one node and send some messages ,
+ * receive the messages within a XA transaction, crash the node after
+ * tx prepared. Then simulate XA recovery on the other node. The expected
+ * behavior is that the recovery won't pick up the messages, which are
+ * only able to be recovered on the original node.
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1940
+ */
+ public void testRecoveryWithSingleDistributedTargetWithMessage() throws Exception
+ {
+ Connection conn = null;
+ XAConnection xaconn = null;
+
+ try
+ {
+ ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+ Queue singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+ conn = createConnectionOnServer(cf, 1);
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = session.createProducer(singleNonClusteredQueue);
+
+ int num = 10;
+ for (int i = 0; i < num; i++)
+ {
+ TextMessage m = session.createTextMessage("TX1-" + i);
+ prod.send(m);
+ }
+
+ xaconn = this.createXAConnectionOnServer(cf, 1);
+ xaconn.start();
+
+ XASession xasess = xaconn.createXASession();
+ XAResource res = xasess.getXAResource();
+
+ //tx - receive - prepared
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer consumer = xasess.createConsumer(singleNonClusteredQueue);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = consumer.receive(5000);
+ assertNotNull(m);
+ }
+
+ res.end(xid1, XAResource.TMSUCCESS);
+
+ res.prepare(xid1);
+
+ //tx - send - prepared
+ Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli2".getBytes());
+
+ res.start(xid2, XAResource.TMNOFLAGS);
+
+ MessageProducer producer = xasess.createProducer(singleNonClusteredQueue);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = xasess.createTextMessage("TX2-" + i);
+ producer.send(m);
+ }
+
+ res.end(xid2, XAResource.TMSUCCESS);
+
+ res.prepare(xid2);
+
+ //normal tx on a distributed queue
+ Xid xid3 = new MessagingXid("bq3".getBytes(), 42, "eemeli3".getBytes());
+
+ res.start(xid3, XAResource.TMNOFLAGS);
+
+ producer = xasess.createProducer(queue[1]);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = xasess.createTextMessage("TX3-" + i);
+ producer.send(m);
+ }
+
+ res.end(xid3, XAResource.TMSUCCESS);
+
+ res.prepare(xid3);
+
+ xaconn.close();
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ ServerManagement.kill(1);
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(30000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ xaconn = createXAConnectionOnServer(cf, 0);
+ xaconn.start();
+
+ XAResource recoveryRes = xaconn.createXASession().getXAResource();
+ Xid[] txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+
+ //the transactions xid1 and xid2 shouldn't be recovered on node 0, because
+ //its messages stay at node 1.
+ assertEquals(1, txs.length);
+
+ assertTrue(txs[0].equals(xid3));
+
+ //commit
+ recoveryRes.commit(txs[0], false);
+
+ txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, txs.length);
+
+ //now receive messages
+ Session sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = sess.createConsumer(queue[0]);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = consumer.receive(5000);
+ assertNotNull(m);
+ }
+
+ xaconn.close();
+
+ //simulate restarting node 1, do not clean db
+ ServerManagement.start(1, config, overrides, false);
+
+ ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+ ic[1] = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+
+ singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+ xaconn = createXAConnectionOnServer(cf, 1);
+ xaconn.start();
+
+ recoveryRes = xaconn.createXASession().getXAResource();
+ txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+
+ //now the tx should get recovered.
+ assertEquals(2, txs.length);
+
+ recoveryRes.commit(txs[0], false);
+ recoveryRes.commit(txs[1], false);
+
+ txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, txs.length);
+
+ //receive message on xid2
+ sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = sess.createConsumer(singleNonClusteredQueue);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = consumer.receive(5000);
+ assertNotNull(m);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ }
+ }
+
// Inner classes --------------------------------------------------------------------------------
Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2013-09-20 06:20:53 UTC (rev 8614)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2013-09-20 06:21:35 UTC (rev 8615)
@@ -1004,6 +1004,15 @@
}
/**
+ * Simulates a queue deployment
+ */
+ public static void deployQueue(boolean clustered, String name, int serverIndex) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployQueue(name, null, clustered);
+ }
+
+ /**
* Simulates a queue deployment (copying the queue descriptor in the deploy directory).
*/
public static void deployQueue(String name) throws Exception
More information about the jboss-cvs-commits
mailing list