[jboss-cvs] JBoss Messaging SVN: r7870 - branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 26 08:52:48 EDT 2009
Author: gaohoward
Date: 2009-10-26 08:52:48 -0400 (Mon, 26 Oct 2009)
New Revision: 7870
Modified:
branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Log:
.test
Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java 2009-10-26 08:58:23 UTC (rev 7869)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java 2009-10-26 12:52:48 UTC (rev 7870)
@@ -39,8 +39,10 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.tx.MessagingXid;
import org.jboss.test.messaging.jms.clustering.XAFailoverTest.DummyXAResource;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
@@ -116,8 +118,8 @@
sendMessages(0, cQueue, msgBase, numMsg);
sendMessages(1, nQueue, msgBase, numMsg);
- receiveMessages(0, cQueue, msgBase, 0, numMsg, null, Session.AUTO_ACKNOWLEDGE, false);
- receiveMessages(2, nQueue, msgBase, 0, numMsg, null, Session.CLIENT_ACKNOWLEDGE, false);
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, false);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, false);
}
//do a redeploy and test the topics work normally by sending some messages and receiving them.
@@ -178,11 +180,11 @@
redeployDestinations(true);
- receiveMessages(0, cQueue, msgBase, 0, numMsg, null, Session.AUTO_ACKNOWLEDGE, true);
- receiveMessages(2, nQueue, msgBase, 0, numMsg, null, Session.CLIENT_ACKNOWLEDGE, true);
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
}
- //send some messages to queues and do redeploy, then receiving them.
+ //send some messages to queues and receive a few of them. Then do redeploy and try to receive the rest.
public void testRedeployQueueNoMessageLoss2() throws Exception
{
String msgBase = "testRedeployQueueNoMessageLoss2";
@@ -193,15 +195,113 @@
sendMessages(1, cQueue, msgBase, numMsg);
sendMessages(0, nQueue, msgBase, numMsg);
- receiveMessages(0, cQueue, msgBase, 0, 10, null, Session.AUTO_ACKNOWLEDGE, false);
- receiveMessages(0, nQueue, msgBase, 0, 10, null, Session.AUTO_ACKNOWLEDGE, false);
+ receiveMessages(0, cQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+ receiveMessages(0, nQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
redeployDestinations(true);
- receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, null, Session.AUTO_ACKNOWLEDGE, true);
- receiveMessages(2, nQueue, msgBase, 10, numMsg - 10, null, Session.CLIENT_ACKNOWLEDGE, true);
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 10, numMsg - 10, Session.CLIENT_ACKNOWLEDGE, true);
}
+
+ //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossTX() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossTX";
+ int numMsg = 50;
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, false, "commit", false);
+ receiveMessagesTX(0, nQueue, msgBase, 0, 10, false, "rollback", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "commit", false);
+ receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "rollback", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA2() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA2";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(0, cQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "prepared", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, false);
+
+ recoverMessages(0, cQueue, msgBase, 0, 10);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA3() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA3";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, nQueue, msgBase, 0, 15, true, "prepared", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(2, nQueue, msgBase, 15, numMsg - 15, Session.CLIENT_ACKNOWLEDGE, true);
+
+ recoverMessages(2, nQueue, msgBase, 0, 15);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA4() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA4";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "noaction", false);
+ receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "noaction", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
/*
* Deploy the following destinations:
*
@@ -263,7 +363,6 @@
ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
}
- log.error("-----------------------------------------redeploying.......");
//redeploy
for (int i = 0; i < nodeCount; i++)
{
@@ -279,7 +378,6 @@
cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");
- log.error("-----------------------------------------redeploying.......done");
cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
try
@@ -332,83 +430,34 @@
}
private void receiveMessages(int serverIndex, Destination dest, String msgBase, int startIndex,
- int numMsg, Boolean isXA, int ack, boolean checkEmpty) throws Exception
+ int numMsg, int ack, boolean checkEmpty) throws Exception
{
Connection conn = null;
- XAConnection xaconn = null;
try
{
Session sess = null;
- XASession xasess = null;
- XAResource res = null;
-
- boolean xa = false;
-
- if (isXA == null)
- {
- //no tx
- conn = createConnectionOnServer(cf, serverIndex);
- sess = conn.createSession(false, ack);
- conn.start();
- }
- else if (isXA.booleanValue())
- {
- //xa
- xaconn = createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
- xasess = xaconn.createXASession();
- res = xasess.getXAResource();
- sess = xasess.getSession();
-
- xaconn.start();
- xa = true;
- }
- else
- {
- //local tx
- conn = createConnectionOnServer(cf, serverIndex);
- sess = conn.createSession(true, Session.SESSION_TRANSACTED);
- conn.start();
- }
- if (xa)
- {
- tm.begin();
-
- Transaction tx = tm.getTransaction();
-
- tx.enlistResource(res);
-
- //Enlist a dummy XAResource to force 2pc
- XAResource dummy = new DummyXAResource();
-
- tx.enlistResource(dummy);
- }
+ conn = createConnectionOnServer(cf, serverIndex);
+ sess = conn.createSession(false, ack);
+ conn.start();
MessageConsumer receiver = sess.createConsumer(dest);
TextMessage msg = null;
+
+ log.info("<<<<<<<<<<<< " + numMsg + " >>>>>>>>>>>>>>");
+
for (int i = 0; i < numMsg; i++)
{
msg = (TextMessage)receiver.receive(5000);
+ log.info("---------------------------------------i: " + i);
assertEquals(msgBase + (startIndex + i), msg.getText());
}
-
- if (isXA == null)
+
+ if (ack == Session.CLIENT_ACKNOWLEDGE)
{
- if (ack == Session.CLIENT_ACKNOWLEDGE)
- {
- msg.acknowledge();
- }
+ msg.acknowledge();
}
- else if (isXA.booleanValue())
- {
- //xa commit
- tm.commit();
- }
- else
- {
- sess.commit();
- }
if (checkEmpty)
{
@@ -428,10 +477,165 @@
{
conn.close();
}
+ }
+ }
+
+ /*
+ * receive messages transactionally.
+ *
+ * outcome values:
+ *
+ * commit -- commit the transaction
+ * rollback -- rollback the transaction
+ * prepared -- parepared the transaction but not commit.
+ *
+ */
+ private void receiveMessagesTX(int serverIndex, Destination dest, String msgBase, int startIndex,
+ int numMsg, boolean isXA, String outcome, boolean checkEmpty) throws Exception
+ {
+ Connection conn = null;
+ XAConnection xaconn = null;
+ Session sess = null;
+ XASession xasess = null;
+ XAResource xres = null;
+ Xid xid = null;
+ try
+ {
+ if (isXA)
+ {
+ xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+ xasess = xaconn.createXASession();
+ xres = xasess.getXAResource();
+ xaconn.start();
+ sess = xasess.getSession();
+
+ xid = new MessagingXid(("bq1" + dest).getBytes(), 42, dest.toString().getBytes());
+
+ xres.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ //local tx
+ conn = createConnectionOnServer(cf, serverIndex);
+ sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ conn.start();
+ }
+
+ //starting receiving
+ MessageConsumer cons = sess.createConsumer(dest);
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)cons.receive(5000);
+ assertEquals(msgBase + (i + startIndex), rm.getText());
+ }
+
+ //ending
+ if (isXA)
+ {
+ xres.end(xid, XAResource.TMSUCCESS);
+
+ if ("commit".equals(outcome))
+ {
+ //just one-phase is enough for the test
+ xres.commit(xid, true);
+ }
+ else if ("rollback".equals(outcome))
+ {
+ xres.rollback(xid);
+ }
+ else if ("prepared".equals(outcome))
+ {
+ xres.prepare(xid);
+ }
+ }
+ else
+ {
+ //local
+ if ("commit".equals(outcome))
+ {
+ sess.commit();
+ }
+ else if ("rollback".equals(outcome))
+ {
+ sess.rollback();
+ }
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
if (xaconn != null)
{
xaconn.close();
}
- }
+ }
}
+
+ //recover the messages in transactions by rollback.
+ private void recoverMessages(int serverIndex, Destination dest,
+ String msgBase, int startIndex, int numMsg) throws Exception
+ {
+ Connection conn = null;
+ XAConnection xaconn = null;
+ Session sess = null;
+ XASession xasess = null;
+ XAResource xres = null;
+
+ try
+ {
+ xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+ xasess = xaconn.createXASession();
+ xres = xasess.getXAResource();
+ xaconn.start();
+
+ Xid[] xids = xres.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = xres.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ conn = this.createConnectionOnServer(cf, serverIndex);
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+
+ MessageConsumer cons = sess.createConsumer(dest);
+ TextMessage rm = (TextMessage)cons.receive(5000);
+ assertNull(rm);
+
+ xres.rollback(xids[0]);
+
+ conn.stop();
+ conn.start();
+ for (int i = 0; i < numMsg; i++)
+ {
+ rm = (TextMessage)cons.receive(5000);
+ assertEquals(msgBase + (startIndex + i), rm.getText());
+ }
+
+ if (dest instanceof Queue)
+ {
+ checkEmpty((Queue)dest);
+ }
+ else
+ {
+ checkEmpty((Topic)dest);
+ }
+ }
+ finally
+ {
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
}
More information about the jboss-cvs-commits
mailing list