[jboss-cvs] JBoss Messaging SVN: r8615 - in branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging: tools and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 20 02:21:36 EDT 2013


Author: bershath27
Date: 2013-09-20 02:21:35 -0400 (Fri, 20 Sep 2013)
New Revision: 8615

Modified:
   branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
   branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
JBPAPP-10865

Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2013-09-20 06:20:53 UTC (rev 8614)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2013-09-20 06:21:35 UTC (rev 8615)
@@ -17,6 +17,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.ServerSession;
 import javax.jms.ServerSessionPool;
 import javax.jms.Session;
@@ -1185,6 +1186,187 @@
       }
    }
 
+   /**
+    * deploy a distributed queue on only one node and send some messages ,
+    * receive the messages within a XA transaction, crash the node after 
+    * tx prepared. Then simulate XA recovery on the other node. The expected
+    * behavior is that the recovery won't pick up the messages, which are
+    * only able to be recovered on the original node.
+    * https://jira.jboss.org/jira/browse/JBMESSAGING-1940
+    */
+   public void testRecoveryWithSingleDistributedTargetWithMessage() throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+
+      try
+      {
+         ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+         Queue singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+         conn = createConnectionOnServer(cf, 1);
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = session.createProducer(singleNonClusteredQueue);
+
+         int num = 10;
+         for (int i = 0; i < num; i++)
+         {
+            TextMessage m = session.createTextMessage("TX1-" + i);
+            prod.send(m);
+         }
+
+         xaconn = this.createXAConnectionOnServer(cf,  1);
+         xaconn.start();
+         
+         XASession xasess = xaconn.createXASession();
+         XAResource res = xasess.getXAResource();
+
+         //tx - receive - prepared
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer consumer = xasess.createConsumer(singleNonClusteredQueue);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = consumer.receive(5000);
+            assertNotNull(m);
+         }
+
+         res.end(xid1, XAResource.TMSUCCESS);
+         
+         res.prepare(xid1);
+
+         //tx - send - prepared
+         Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli2".getBytes());
+
+         res.start(xid2, XAResource.TMNOFLAGS);
+
+         MessageProducer producer = xasess.createProducer(singleNonClusteredQueue);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = xasess.createTextMessage("TX2-" + i);
+            producer.send(m);
+         }
+
+         res.end(xid2, XAResource.TMSUCCESS);
+         
+         res.prepare(xid2);
+
+         //normal tx on a distributed queue
+         Xid xid3 = new MessagingXid("bq3".getBytes(), 42, "eemeli3".getBytes());
+
+         res.start(xid3, XAResource.TMNOFLAGS);
+
+         producer = xasess.createProducer(queue[1]);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = xasess.createTextMessage("TX3-" + i);
+            producer.send(m);
+         }
+
+         res.end(xid3, XAResource.TMSUCCESS);
+         
+         res.prepare(xid3);
+         
+         xaconn.close();
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+         ServerManagement.kill(1);
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         xaconn = createXAConnectionOnServer(cf,  0);
+         xaconn.start();
+         
+         XAResource recoveryRes = xaconn.createXASession().getXAResource();
+         Xid[] txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+         
+         //the transactions xid1 and xid2 shouldn't be recovered on node 0, because 
+         //its messages stay at node 1.
+         assertEquals(1, txs.length);
+
+         assertTrue(txs[0].equals(xid3));
+         
+         //commit
+         recoveryRes.commit(txs[0], false);
+
+         txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, txs.length);
+         
+         //now receive messages
+         Session sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer = sess.createConsumer(queue[0]);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = consumer.receive(5000);
+            assertNotNull(m);
+         }
+
+         xaconn.close();
+         
+         //simulate restarting node 1, do not clean db
+         ServerManagement.start(1, config, overrides, false);
+
+         ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+         ic[1] = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+
+         singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+         xaconn = createXAConnectionOnServer(cf,  1);
+         xaconn.start();
+         
+         recoveryRes = xaconn.createXASession().getXAResource();
+         txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+         
+         //now the tx should get recovered.
+         assertEquals(2, txs.length);
+         
+         recoveryRes.commit(txs[0], false);
+         recoveryRes.commit(txs[1], false);
+         
+         txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, txs.length);
+         
+         //receive message on xid2
+         sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer = sess.createConsumer(singleNonClusteredQueue);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = consumer.receive(5000);
+            assertNotNull(m);
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+      }
+   }
+
    
    // Inner classes --------------------------------------------------------------------------------
 

Modified: branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2013-09-20 06:20:53 UTC (rev 8614)
+++ branches/Branch_JBossMessaging_1_4_8_SP9_JBMESSAGING-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2013-09-20 06:21:35 UTC (rev 8615)
@@ -1004,6 +1004,15 @@
    }
 
    /**
+    * Simulates a queue deployment
+    */
+   public static void deployQueue(boolean clustered, String name, int serverIndex) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployQueue(name, null, clustered);
+   }
+
+   /**
     * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
     */
    public static void deployQueue(String name) throws Exception



More information about the jboss-cvs-commits mailing list