[jboss-cvs] JBoss Messaging SVN: r4918 - trunk/tests/src/org/jboss/messaging/tests/integration/xa.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Sep 8 06:18:19 EDT 2008
Author: ataylor
Date: 2008-09-08 06:18:19 -0400 (Mon, 08 Sep 2008)
New Revision: 4918
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1299 - reenabled xa recovery with tests - added more tests
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-09-08 09:34:55 UTC (rev 4917)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-09-08 10:18:19 UTC (rev 4918)
@@ -21,17 +21,7 @@
*/
package org.jboss.messaging.tests.integration.xa;
-import java.io.File;
-import java.util.Arrays;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.*;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -44,13 +34,21 @@
import org.jboss.messaging.util.SimpleString;
import org.jboss.util.id.GUID;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.Arrays;
+
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
public class BasicXaRecoveryTest extends UnitTestCase
{
- protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/journal";
- protected String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/bindings";
+ private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+ private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+ private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/journal";
+ private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/bindings";
private MessagingService messagingService;
private ClientSession clientSession;
private ClientProducer clientProducer;
@@ -70,13 +68,14 @@
configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
- TransportConfiguration transportConfig = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory");
+
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
//start the server
messagingService.start();
//then we create a client as normal
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
clientSession = sessionFactory.createSession(true, false, false, 1, false);
clientSession.createQueue(atestq, atestq, null, true, true);
clientProducer = clientSession.createProducer(atestq);
@@ -113,6 +112,116 @@
public void testBasicSendWithCommit() throws Exception
{
+ testBasicSendWithCommit(false);
+ }
+
+ public void testBasicSendWithCommitWithServerStopped() throws Exception
+ {
+ testBasicSendWithCommit(true);
+ }
+
+ public void testBasicSendWithRollback() throws Exception
+ {
+ testBasicSendWithRollback(false);
+ }
+
+ public void testBasicSendWithRollbackWithServerStopped() throws Exception
+ {
+ testBasicSendWithRollback(true);
+ }
+
+ public void testMultipleBeforeSendWithCommit() throws Exception
+ {
+ testMultipleBeforeSendWithCommit(false);
+ }
+
+ public void testMultipleBeforeSendWithCommitWithServerStopped() throws Exception
+ {
+ testMultipleBeforeSendWithCommit(true);
+ }
+
+ public void testMultipleTxSendWithCommit() throws Exception
+ {
+ testMultipleTxSendWithCommit(false);
+ }
+
+ public void testMultipleTxSendWithCommitWithServerStopped() throws Exception
+ {
+ testMultipleTxSendWithCommit(true);
+ }
+
+ public void testMultipleTxSendWithRollback() throws Exception
+ {
+ testMultipleTxSendWithRollback(false);
+ }
+
+ public void testMultipleTxSendWithRollbackWithServerStopped() throws Exception
+ {
+ testMultipleTxSendWithRollback(true);
+ }
+
+ public void testMultipleTxSendWithCommitAndRollback() throws Exception
+ {
+ testMultipleTxSendWithCommitAndRollback(false);
+ }
+
+ public void testMultipleTxSendWithCommitAndRollbackWithServerStopped() throws Exception
+ {
+ testMultipleTxSendWithCommitAndRollback(true);
+ }
+
+ public void testMultipleTxSameXidSendWithCommit() throws Exception
+ {
+ testMultipleTxSameXidSendWithCommit(false);
+ }
+
+ public void testMultipleTxSameXidSendWithCommitWithServerStopped() throws Exception
+ {
+ testMultipleTxSameXidSendWithCommit(true);
+ }
+
+ public void testBasicReceiveWithCommit() throws Exception
+ {
+ testBasicReceiveWithCommit(false);
+ }
+
+ public void testBasicReceiveWithCommitWithServerStopped() throws Exception
+ {
+ testBasicReceiveWithCommit(true);
+ }
+
+ public void testBasicReceiveWithRollback() throws Exception
+ {
+ testBasicReceiveWithRollback(false);
+ }
+
+ public void testBasicReceiveWithRollbackWithServerStopped() throws Exception
+ {
+ testBasicReceiveWithRollback(true);
+ }
+
+ public void testMultipleTxReceiveWithCommit() throws Exception
+ {
+ testMultipleTxReceiveWithCommit(false);
+ }
+
+ public void testMultipleTxReceiveWithCommitWithServerStopped() throws Exception
+ {
+ testMultipleTxReceiveWithCommit(true);
+ }
+
+ public void testMultipleTxReceiveWithRollback() throws Exception
+ {
+ testMultipleTxReceiveWithRollback(false);
+ }
+
+ public void testMultipleTxReceiveWithRollbackWithServerStopped() throws Exception
+ {
+ testMultipleTxReceiveWithRollback(true);
+ }
+
+ public void testBasicSendWithCommit(boolean stopServer) throws Exception
+ {
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
ClientMessage m1 = createTextMessage("m1");
@@ -128,7 +237,14 @@
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
@@ -154,7 +270,7 @@
assertEquals(m.getBody().getString(), "m4");
}
- public void testBasicSendWithRollback() throws Exception
+ public void testBasicSendWithRollback(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
@@ -171,7 +287,14 @@
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
@@ -187,7 +310,7 @@
assertNull(m);
}
- public void testMultipleBeforeSendWithCommit() throws Exception
+ public void testMultipleBeforeSendWithCommit(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
ClientMessage m1 = createTextMessage("m1");
@@ -213,7 +336,14 @@
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
@@ -239,7 +369,7 @@
assertEquals(m.getBody().getString(), "m8");
}
- public void testMultipleTxSendWithCommit() throws Exception
+ public void testMultipleTxSendWithCommit(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
Xid xid2 = new XidImpl("xa2".getBytes(), 1, new GUID().toString().getBytes());
@@ -269,7 +399,14 @@
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
@@ -306,9 +443,10 @@
assertEquals(m.getBody().getString(), "m4");
}
- public void testMultipleTxSameXidSendWithCommit() throws Exception
+ public void testMultipleTxSendWithRollback(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+ Xid xid2 = new XidImpl("xa2".getBytes(), 1, new GUID().toString().getBytes());
ClientMessage m1 = createTextMessage("m1");
ClientMessage m2 = createTextMessage("m2");
ClientMessage m3 = createTextMessage("m3");
@@ -319,6 +457,121 @@
ClientMessage m8 = createTextMessage("m8");
ClientSession clientSession2 = sessionFactory.createSession(true, false, true, 1, false);
ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+ clientSession2.start(xid2, XAResource.TMNOFLAGS);
+ clientProducer2.send(m1);
+ clientProducer2.send(m2);
+ clientProducer2.send(m3);
+ clientProducer2.send(m4);
+ clientSession2.end(xid2, XAResource.TMSUCCESS);
+ clientSession2.prepare(xid2);
+ clientSession2.close();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m5);
+ clientProducer.send(m6);
+ clientProducer.send(m7);
+ clientProducer.send(m8);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
+ Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+
+ assertEquals(xids.length, 2);
+ assertEqualXids(xids, xid, xid2);
+ xids = clientSession.recover(XAResource.TMENDRSCAN);
+ assertEquals(xids.length, 0);
+ clientSession.rollback(xid);
+ clientSession.rollback(xid2);
+ clientSession.start();
+ ClientMessage m = clientConsumer.receive(1000);
+ assertNull(m);
+ }
+
+ public void testMultipleTxSendWithCommitAndRollback(boolean stopServer) throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+ Xid xid2 = new XidImpl("xa2".getBytes(), 1, new GUID().toString().getBytes());
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ ClientMessage m5 = createTextMessage("m5");
+ ClientMessage m6 = createTextMessage("m6");
+ ClientMessage m7 = createTextMessage("m7");
+ ClientMessage m8 = createTextMessage("m8");
+ ClientSession clientSession2 = sessionFactory.createSession(true, false, true, 1, false);
+ ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+ clientSession2.start(xid2, XAResource.TMNOFLAGS);
+ clientProducer2.send(m1);
+ clientProducer2.send(m2);
+ clientProducer2.send(m3);
+ clientProducer2.send(m4);
+ clientSession2.end(xid2, XAResource.TMSUCCESS);
+ clientSession2.prepare(xid2);
+ clientSession2.close();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m5);
+ clientProducer.send(m6);
+ clientProducer.send(m7);
+ clientProducer.send(m8);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
+ Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+
+ assertEquals(xids.length, 2);
+ assertEqualXids(xids, xid, xid2);
+ xids = clientSession.recover(XAResource.TMENDRSCAN);
+ assertEquals(xids.length, 0);
+ clientSession.rollback(xid);
+ clientSession.commit(xid2, true);
+ clientSession.start();
+ ClientMessage m = clientConsumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = clientConsumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m2");
+ m = clientConsumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = clientConsumer.receive(1000);
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ m = clientConsumer.receive(1000);
+ assertNull(m);
+ }
+
+ public void testMultipleTxSameXidSendWithCommit(boolean stopServer) throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ ClientMessage m5 = createTextMessage("m5");
+ ClientMessage m6 = createTextMessage("m6");
+ ClientMessage m7 = createTextMessage("m7");
+ ClientMessage m8 = createTextMessage("m8");
+ ClientSession clientSession2 = sessionFactory.createSession(true, false, true, 1, false);
+ ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
clientSession2.start(xid, XAResource.TMNOFLAGS);
clientProducer2.send(m1);
clientProducer2.send(m2);
@@ -334,7 +587,14 @@
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
@@ -372,7 +632,7 @@
assertEquals(m.getBody().getString(), "m8");
}
- public void testBasicReceiveWithCommit() throws Exception
+ public void testBasicReceiveWithCommit(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
ClientMessage m1 = createTextMessage("m1");
@@ -406,8 +666,16 @@
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
assertEquals(xids.length, 1);
@@ -422,7 +690,7 @@
assertNull(m);
}
- public void testBasicReceiveWithRollback() throws Exception
+ public void testBasicReceiveWithRollback(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
ClientMessage m1 = createTextMessage("m1");
@@ -456,8 +724,16 @@
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
assertEquals(xids.length, 1);
@@ -482,7 +758,7 @@
assertEquals(m.getBody().getString(), "m4");
}
- public void testMultipleTxReceiveWithCommit() throws Exception
+ public void testMultipleTxReceiveWithCommit(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
Xid xid2 = new XidImpl("xa2".getBytes(), 1, new GUID().toString().getBytes());
@@ -552,8 +828,16 @@
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- stopAndRestartServer();
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
assertEqualXids(xids, xid, xid2);
xids = clientSession.recover(XAResource.TMENDRSCAN);
@@ -564,6 +848,110 @@
assertNull(m);
}
+ public void testMultipleTxReceiveWithRollback(boolean stopServer) throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+ Xid xid2 = new XidImpl("xa2".getBytes(), 1, new GUID().toString().getBytes());
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ ClientMessage m5 = createTextMessage("m5");
+ ClientMessage m6 = createTextMessage("m6");
+ ClientMessage m7 = createTextMessage("m7");
+ ClientMessage m8 = createTextMessage("m8");
+ ClientSession clientSession2 = sessionFactory.createSession(false, true, true, 1, false);
+ ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+ SimpleString anewtestq = new SimpleString("anewtestq");
+ clientSession.createQueue(anewtestq, anewtestq, null, true, true);
+ ClientProducer clientProducer3 = clientSession2.createProducer(anewtestq);
+ clientProducer2.send(m1);
+ clientProducer2.send(m2);
+ clientProducer2.send(m3);
+ clientProducer2.send(m4);
+ clientProducer3.send(m5);
+ clientProducer3.send(m6);
+ clientProducer3.send(m7);
+ clientProducer3.send(m8);
+ clientSession2.close();
+ clientSession2 = sessionFactory.createSession(true, false, false, 1, false);
+ ClientConsumer clientConsumer2 = clientSession2.createConsumer(anewtestq);
+ clientSession2.start(xid2, XAResource.TMNOFLAGS);
+ clientSession2.start();
+ ClientMessage m = clientConsumer2.receive(1000);
+ clientSession2.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m5");
+ m = clientConsumer2.receive(1000);
+ assertNotNull(m);
+ clientSession2.acknowledge();
+ assertEquals(m.getBody().getString(), "m6");
+ m = clientConsumer2.receive(1000);
+ clientSession2.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m7");
+ m = clientConsumer2.receive(1000);
+ clientSession2.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m8");
+ clientSession2.end(xid2, XAResource.TMSUCCESS);
+ clientSession2.prepare(xid2);
+ clientSession2.close();
+ clientSession2 = null;
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.start();
+ m = clientConsumer.receive(1000);
+ clientSession.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = clientConsumer.receive(1000);
+ assertNotNull(m);
+ clientSession.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = clientConsumer.receive(1000);
+ clientSession.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = clientConsumer.receive(1000);
+ clientSession.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ if (stopServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
+ Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+ assertEqualXids(xids, xid, xid2);
+ xids = clientSession.recover(XAResource.TMENDRSCAN);
+ assertEquals(xids.length, 0);
+ clientSession.rollback(xid);
+ clientSession.start();
+ m = clientConsumer.receive(1000);
+ clientSession.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = clientConsumer.receive(1000);
+ assertNotNull(m);
+ clientSession.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = clientConsumer.receive(1000);
+ clientSession.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = clientConsumer.receive(1000);
+ clientSession.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ }
+
protected void stopAndRestartServer() throws Exception
{
//now stop and start the server
@@ -576,6 +964,13 @@
createClients();
}
+ protected void recreateClients() throws Exception
+ {
+ clientSession.close();
+ clientSession = null;
+ createClients();
+ }
+
private ClientMessage createTextMessage(String s)
{
ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, true, 0, System.currentTimeMillis(), (byte) 1);
@@ -587,7 +982,7 @@
private void createClients()
throws MessagingException
{
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
clientSession = sessionFactory.createSession(true, false, true, 1, false);
clientProducer = clientSession.createProducer(atestq);
clientConsumer = clientSession.createConsumer(atestq);
More information about the jboss-cvs-commits
mailing list