[jboss-cvs] JBoss Messaging SVN: r5198 - branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 29 01:50:59 EDT 2008
Author: gaohoward
Date: 2008-10-29 01:50:58 -0400 (Wed, 29 Oct 2008)
New Revision: 5198
Modified:
branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
Log:
JBMESSAGING-1416
Modified: branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
===================================================================
--- branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java 2008-10-29 03:54:17 UTC (rev 5197)
+++ branches/Branch_JBMESSAGING_1416/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java 2008-10-29 05:50:58 UTC (rev 5198)
@@ -24,6 +24,7 @@
package org.jboss.test.messaging.jms;
import javax.jms.Connection;
+import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -32,8 +33,16 @@
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.jboss.jms.client.JBossMessageProducer;
+import org.jboss.jms.tx.MessagingXid;
+import org.jboss.jms.tx.ResourceManagerFactory;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
/**
* A OrderingGroupAckTest
@@ -58,7 +67,23 @@
{
super(name);
}
+
+ // TestCase overrides -------------------------------------------
+ protected void setUp() throws Exception
+ {
+ // if this is not set testMockCoordinatorRecoveryWithJBossTSXids will create an invalid ObjectStore
+ ServiceContainer.setupObjectStoreDir();
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ ResourceManagerFactory.instance.clear();
+ }
+
// Public --------------------------------------------------------
/*
@@ -124,7 +149,7 @@
/*
- * This test shows how ordering group handle client acknowledge.
+ * This test shows how ordering group handles client acknowledge.
* the second message will never be sent out unless the first message is acked.
*/
public void testClientAcknowledge() throws Exception
@@ -203,7 +228,280 @@
}
}
}
+
+ /*
+ * send 4 messages, start a XA transaction to receive, messages will be received only if
+ * the last message is committed.
+ */
+ public void testSimpleXATransactionalReceive() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalReceive");
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ try
+ {
+ //First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ //Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ //Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ //next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ //prepare the tx
+ res1.prepare(xid1);
+
+ res1.commit(xid1, false);
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm2.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm3.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm4.getText());
+
+
+ checkEmpty(queue1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+ }
+ }
+
+ /*
+ * send 4 messages, start a XA transaction to receive.
+ * In XA recovery, the messages will be re-sent without
+ * breaking the original order
+ */
+ public void testSimpleXATransactionalRecoveryCommitReceive() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalRecoveryCommitReceive");
+
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ XAConnection xconn2 = null;
+
+ try
+ {
+ //First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ //non-persistent will cause message lost in server failure
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRecoveryCommitReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ //Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ //Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ //prepare the tx
+
+ res1.prepare(xid1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ conn1 = null;
+
+ xconn1 = null;
+
+ // Now "crash" the server
+
+ ServerManagement.stopServerPeer();
+
+ ServerManagement.startServerPeer();
+
+ deployAndLookupAdministeredObjects();
+
+ //Now recover
+
+ xconn2 = cf.createXAConnection();
+
+ XASession xsess2 = xconn2.createXASession();
+
+ XAResource res3 = xsess2.getXAResource();
+
+ Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ assertEquals(xid1, xids[0]);
+
+ //Commit the tx
+
+ res3.commit(xids[0], false);
+
+ //The message should be acknowldged
+
+ xconn2.close();
+
+ conn1 = cf.createConnection();
+
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ conn1.start();
+
+ //tm2, tm3, tm4 should be received.
+ TextMessage tmr = (TextMessage)cons1.receive(1000);
+ assertNotNull(tmr);
+ assertEquals(tmr.getText(), tm2.getText());
+
+ tmr = (TextMessage)cons1.receive(1000);
+ assertNotNull(tmr);
+ assertEquals(tmr.getText(), tm3.getText());
+
+ tmr = (TextMessage)cons1.receive(1000);
+ assertNotNull(tmr);
+ assertEquals(tmr.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list