[jboss-cvs] JBoss Messaging SVN: r7870 - branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 26 08:52:48 EDT 2009


Author: gaohoward
Date: 2009-10-26 08:52:48 -0400 (Mon, 26 Oct 2009)
New Revision: 7870

Modified:
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Log:
.test


Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-26 08:58:23 UTC (rev 7869)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-26 12:52:48 UTC (rev 7870)
@@ -39,8 +39,10 @@
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
 
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.tx.MessagingXid;
 import org.jboss.test.messaging.jms.clustering.XAFailoverTest.DummyXAResource;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
@@ -116,8 +118,8 @@
       sendMessages(0, cQueue, msgBase, numMsg);
       sendMessages(1, nQueue, msgBase, numMsg);
       
-      receiveMessages(0, cQueue, msgBase, 0, numMsg, null, Session.AUTO_ACKNOWLEDGE, false);
-      receiveMessages(2, nQueue, msgBase, 0, numMsg, null, Session.CLIENT_ACKNOWLEDGE, false);
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, false);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, false);
    }
 
    //do a redeploy and test the topics work normally by sending some messages and receiving them.
@@ -178,11 +180,11 @@
       
       redeployDestinations(true);
       
-      receiveMessages(0, cQueue, msgBase, 0, numMsg, null, Session.AUTO_ACKNOWLEDGE, true);
-      receiveMessages(2, nQueue, msgBase, 0, numMsg, null, Session.CLIENT_ACKNOWLEDGE, true);
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
    }
    
-   //send some messages to queues and do redeploy, then receiving them.
+   //send some messages to queues and receive a few of them. Then do redeploy and try to receive the rest.
    public void testRedeployQueueNoMessageLoss2() throws Exception
    {
       String msgBase = "testRedeployQueueNoMessageLoss2";
@@ -193,15 +195,113 @@
       sendMessages(1, cQueue, msgBase, numMsg);
       sendMessages(0, nQueue, msgBase, numMsg);
       
-      receiveMessages(0, cQueue, msgBase, 0, 10, null, Session.AUTO_ACKNOWLEDGE, false);
-      receiveMessages(0, nQueue, msgBase, 0, 10, null, Session.AUTO_ACKNOWLEDGE, false);
+      receiveMessages(0, cQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+      receiveMessages(0, nQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
       
       redeployDestinations(true);
       
-      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, null, Session.AUTO_ACKNOWLEDGE, true);
-      receiveMessages(2, nQueue, msgBase, 10, numMsg - 10, null, Session.CLIENT_ACKNOWLEDGE, true);
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 10, numMsg - 10, Session.CLIENT_ACKNOWLEDGE, true);
    }
+   
+   //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossTX() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossTX";
+      int numMsg = 50;
 
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, false, "commit", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, false, "rollback", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "commit", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "rollback", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA2() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA2";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "prepared", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, false);
+      
+      recoverMessages(0, cQueue, msgBase, 0, 10);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA3() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA3";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, nQueue, msgBase, 0, 15, true, "prepared", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(2, nQueue, msgBase, 15, numMsg - 15, Session.CLIENT_ACKNOWLEDGE, true);
+      
+      recoverMessages(2, nQueue, msgBase, 0, 15);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA4() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA4";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "noaction", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "noaction", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
    /*
     * Deploy the following destinations:
     * 
@@ -263,7 +363,6 @@
          ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
       }
       
-      log.error("-----------------------------------------redeploying.......");
       //redeploy
       for (int i = 0; i < nodeCount; i++)
       {
@@ -279,7 +378,6 @@
       cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
       nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");  
       
-      log.error("-----------------------------------------redeploying.......done");
       cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
 
       try
@@ -332,83 +430,34 @@
    }
 
    private void receiveMessages(int serverIndex, Destination dest, String msgBase, int startIndex,
-                                int numMsg, Boolean isXA, int ack, boolean checkEmpty) throws Exception
+                                int numMsg, int ack, boolean checkEmpty) throws Exception
    {
       Connection conn = null;
-      XAConnection xaconn = null;
       
       try
       {
          Session sess = null;
-         XASession xasess = null;
-         XAResource res = null;
-         
-         boolean xa = false;
-         
-         if (isXA == null)
-         {
-            //no tx
-            conn = createConnectionOnServer(cf, serverIndex);
-            sess = conn.createSession(false, ack);
-            conn.start();
-         }
-         else if (isXA.booleanValue())
-         {
-            //xa
-            xaconn = createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
-            xasess = xaconn.createXASession();
-            res = xasess.getXAResource();
-            sess = xasess.getSession();
-            
-            xaconn.start();
-            xa = true;
-         }
-         else
-         {
-            //local tx
-            conn = createConnectionOnServer(cf, serverIndex);
-            sess = conn.createSession(true, Session.SESSION_TRANSACTED);
-            conn.start();
-         }
 
-         if (xa)
-         {
-            tm.begin();
-            
-            Transaction tx = tm.getTransaction();
-            
-            tx.enlistResource(res);
-            
-            //Enlist a dummy XAResource to force 2pc
-            XAResource dummy = new DummyXAResource();        
-            
-            tx.enlistResource(dummy);
-         }
+         conn = createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(false, ack);
+         conn.start();
          
          MessageConsumer receiver = sess.createConsumer(dest);
          TextMessage msg = null;
+         
+         log.info("<<<<<<<<<<<< " + numMsg + " >>>>>>>>>>>>>>");
+         
          for (int i = 0; i < numMsg; i++)
          {
             msg = (TextMessage)receiver.receive(5000);
+            log.info("---------------------------------------i: " + i);
             assertEquals(msgBase + (startIndex + i), msg.getText());
          }
-         
-         if (isXA == null)
+
+         if (ack == Session.CLIENT_ACKNOWLEDGE)
          {
-            if (ack == Session.CLIENT_ACKNOWLEDGE)
-            {
-               msg.acknowledge();
-            }
+            msg.acknowledge();
          }
-         else if (isXA.booleanValue())
-         {
-            //xa commit
-            tm.commit();
-         }
-         else
-         {
-            sess.commit();
-         }
          
          if (checkEmpty)
          {
@@ -428,10 +477,165 @@
          {
             conn.close();
          }
+      }
+   }
+   
+   /*
+    * receive messages transactionally.
+    * 
+    * outcome values:
+    * 
+    * commit -- commit the transaction
+    * rollback -- rollback the transaction
+    * prepared -- parepared the transaction but not commit.
+    * 
+    */
+   private void receiveMessagesTX(int serverIndex, Destination dest, String msgBase, int startIndex, 
+                                  int numMsg, boolean isXA, String outcome, boolean checkEmpty) throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+      Session sess = null;
+      XASession xasess = null;
+      XAResource xres = null;
+      Xid xid = null;
+      try
+      {
+      if (isXA)
+      {
+         xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+         xasess = xaconn.createXASession();
+         xres = xasess.getXAResource();
+         xaconn.start();
+         sess = xasess.getSession();
+
+         xid = new MessagingXid(("bq1" + dest).getBytes(), 42, dest.toString().getBytes());
+
+         xres.start(xid, XAResource.TMNOFLAGS);
+      }
+      else
+      {
+         //local tx
+         conn = createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+         conn.start();
+      }
+      
+      //starting receiving
+      MessageConsumer cons = sess.createConsumer(dest);
+      for (int i = 0; i < numMsg; i++)
+      {
+         TextMessage rm = (TextMessage)cons.receive(5000);
+         assertEquals(msgBase + (i + startIndex), rm.getText());
+      }
+      
+      //ending
+      if (isXA)
+      {
+         xres.end(xid, XAResource.TMSUCCESS);
+         
+         if ("commit".equals(outcome))
+         {
+            //just one-phase is enough for the test
+            xres.commit(xid, true);
+         }
+         else if ("rollback".equals(outcome))
+         {
+            xres.rollback(xid);
+         }
+         else if ("prepared".equals(outcome))
+         {
+            xres.prepare(xid);
+         }
+      }
+      else
+      {
+         //local
+         if ("commit".equals(outcome))
+         {
+            sess.commit();
+         }
+         else if ("rollback".equals(outcome))
+         {
+            sess.rollback();
+         }
+      }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
          if (xaconn != null)
          {
             xaconn.close();
          }
-      }      
+      }
    }
+
+   //recover the messages in transactions by rollback.
+   private void recoverMessages(int serverIndex, Destination dest, 
+                                String msgBase, int startIndex, int numMsg) throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+      Session sess = null;
+      XASession xasess = null;
+      XAResource xres = null;
+
+      try
+      {
+         xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+         xasess = xaconn.createXASession();
+         xres = xasess.getXAResource();
+         xaconn.start();
+         
+         Xid[] xids = xres.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = xres.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+         
+         conn = this.createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn.start();
+         
+         MessageConsumer cons = sess.createConsumer(dest);
+         TextMessage rm = (TextMessage)cons.receive(5000);
+         assertNull(rm);
+                  
+         xres.rollback(xids[0]);
+
+         conn.stop();
+         conn.start();
+         for (int i = 0; i < numMsg; i++)
+         {
+            rm = (TextMessage)cons.receive(5000);
+            assertEquals(msgBase + (startIndex + i), rm.getText());
+         }
+
+         if (dest instanceof Queue)
+         {
+            checkEmpty((Queue)dest);
+         }
+         else
+         {
+            checkEmpty((Topic)dest);
+         }
+      }
+      finally
+      {
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+
 }




More information about the jboss-cvs-commits mailing list