[jboss-cvs] JBoss Messaging SVN: r5198 - branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 29 01:50:59 EDT 2008


Author: gaohoward
Date: 2008-10-29 01:50:58 -0400 (Wed, 29 Oct 2008)
New Revision: 5198

Modified:
   branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
Log:
JBMESSAGING-1416


Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	2008-10-29 03:54:17 UTC (rev 5197)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java	2008-10-29 05:50:58 UTC (rev 5198)
@@ -24,6 +24,7 @@
 package org.jboss.test.messaging.jms;
 
 import javax.jms.Connection;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -32,8 +33,16 @@
 import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
 
 import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.jms.tx.MessagingXid;
+import org.jboss.jms.tx.ResourceManagerFactory;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
 
 /**
  * A OrderingGroupAckTest
@@ -58,7 +67,23 @@
    {
       super(name);
    }
+   
+   // TestCase overrides -------------------------------------------
 
+   protected void setUp() throws Exception
+   {
+      // if this is not set testMockCoordinatorRecoveryWithJBossTSXids will create an invalid ObjectStore
+      ServiceContainer.setupObjectStoreDir();
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      ResourceManagerFactory.instance.clear();      
+   }
+
    // Public --------------------------------------------------------
    
    /*
@@ -124,7 +149,7 @@
    
    
    /*
-    * This test shows how ordering group handle client acknowledge.
+    * This test shows how ordering group handles client acknowledge.
     * the second message will never be sent out unless the first message is acked.
     */
    public void testClientAcknowledge() throws Exception
@@ -203,7 +228,280 @@
          }
       }
    }
+   
+   /*
+    * send 4 messages, start a XA transaction to receive, messages will be received only if
+    * the last message is committed. 
+    */
+   public void testSimpleXATransactionalReceive() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalReceive");
 
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      try
+      {
+         //First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         //Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         //Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+         
+         //next message should not be received.
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNull(rm1);
+
+         //prepare the tx
+         res1.prepare(xid1);
+         
+         res1.commit(xid1, false);
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm2.getText());
+         
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm3.getText());
+
+         rm1 = (TextMessage)cons.receive(2000);
+         assertNotNull(rm1);
+         assertEquals(rm1.getText(), tm4.getText());
+         
+
+         checkEmpty(queue1);
+         
+         conn1.close();
+
+         xconn1.close();
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+      }
+   }
+
+   /*
+    * send 4 messages, start a XA transaction to receive. 
+    * In XA recovery, the messages will be re-sent without
+    * breaking the original order
+    */
+   public void testSimpleXATransactionalRecoveryCommitReceive() throws Exception
+   {
+      log.trace("starting testSimpleXATransactionalRecoveryCommitReceive");
+
+      Connection conn1 = null;
+
+      XAConnection xconn1 = null;
+
+      XAConnection xconn2 = null;
+
+      try
+      {
+         //First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+         //non-persistent will cause message lost in server failure
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod.enableOrderingGroup("testSimpleXATransactionalRecoveryCommitReceive");
+
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+         TextMessage tm3 = sess1.createTextMessage("tm3");
+         TextMessage tm4 = sess1.createTextMessage("tm4");
+
+         prod.send(tm1);
+         prod.send(tm2);
+         prod.send(tm3);
+         prod.send(tm4);
+
+         xconn1 = cf.createXAConnection();
+
+         XASession xsess1 = xconn1.createXASession();
+
+         XAResource res1 = xsess1.getXAResource();
+
+         //Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = xsess1.createConsumer(queue1);
+
+         xconn1.start();
+
+         //Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         //prepare the tx
+
+         res1.prepare(xid1);
+
+         conn1.close();
+
+         xconn1.close();
+
+         conn1 = null;
+
+         xconn1 = null;
+
+         // Now "crash" the server
+
+         ServerManagement.stopServerPeer();
+
+         ServerManagement.startServerPeer();
+
+         deployAndLookupAdministeredObjects();
+
+         //Now recover
+
+         xconn2 = cf.createXAConnection();
+
+         XASession xsess2 = xconn2.createXASession();
+
+         XAResource res3 = xsess2.getXAResource();
+
+         Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+
+         assertEquals(xid1, xids[0]);
+
+         //Commit the tx
+
+         res3.commit(xids[0], false);
+
+         //The message should be acknowldged
+
+         xconn2.close();
+
+         conn1 = cf.createConnection();
+
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+         conn1.start();
+
+         //tm2, tm3, tm4 should be received.
+         TextMessage tmr = (TextMessage)cons1.receive(1000);
+         assertNotNull(tmr);
+         assertEquals(tmr.getText(), tm2.getText());
+
+         tmr = (TextMessage)cons1.receive(1000);
+         assertNotNull(tmr);
+         assertEquals(tmr.getText(), tm3.getText());
+
+         tmr = (TextMessage)cons1.receive(1000);
+         assertNotNull(tmr);
+         assertEquals(tmr.getText(), tm4.getText());
+         
+         checkEmpty(queue1);
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (xconn1 != null)
+         {
+            try
+            {
+               xconn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list