[jboss-cvs] JBoss Messaging SVN: r6110 - in trunk: src/main/org/jboss/messaging/core/server/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 18 17:49:31 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-18 17:49:31 -0400 (Wed, 18 Mar 2009)
New Revision: 6110
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java
Modified:
trunk/build-messaging.xml
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
Log:
XA Tests, JBMESSAGING-1548 (Empty XID and Rollback fix), XAFailoverTests and other small pieces
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/build-messaging.xml 2009-03-18 21:49:31 UTC (rev 6110)
@@ -286,32 +286,51 @@
<path refid="jboss.jbossxb.classpath"/>
</path>
- <path id="unit.test.execution.classpath">
- <!-- ensure that the core client jar is included for
- tests security tests which needs to read version.properties
- from inside this jar
- -->
- <fileset dir="${build.jars.dir}">
- <include name="${core.client.jar.name}"/>
- </fileset>
- <pathelement location="${test.dir}/config"/>
- <pathelement location="${test.dir}/tmpfiles"/>
- <pathelement location="${test.classes.dir}"/>
- <pathelement location="${src.config.dir}"/>
- <pathelement location="${src.schemas.dir}"/>
- <path refid="test.compilation.classpath"/>
- <path refid="oswego.concurrent.classpath"/>
- <path refid="apache.log4j.classpath"/>
- <path refid="cglib.classpath"/>
- <path refid="jboss.common.core.classpath"/>
- <path refid="jboss.aop.classpath"/>
- <path refid="trove.trove.classpath"/>
- <path refid="javassist.classpath"/>
- <path refid="jboss.jbossxb.classpath"/>
- <path refid="apache.xerces.classpath"/>
- <path refid="apache.logging.classpath"/>
- </path>
+ <path id="unit.test.execution.classpath">
+ <!-- ensure that the core client jar is included for
+ tests security tests which needs to read version.properties
+ from inside this jar
+ -->
+ <fileset dir="${build.jars.dir}">
+ <include name="${core.client.jar.name}"/>
+ </fileset>
+ <pathelement location="${test.dir}/config"/>
+ <pathelement location="${test.dir}/tmpfiles"/>
+ <pathelement location="${test.classes.dir}"/>
+ <pathelement location="${src.config.dir}"/>
+ <pathelement location="${src.schemas.dir}"/>
+ <path refid="test.compilation.classpath"/>
+ <path refid="oswego.concurrent.classpath"/>
+ <path refid="apache.log4j.classpath"/>
+ <path refid="cglib.classpath"/>
+ <path refid="jboss.common.core.classpath"/>
+ <path refid="jboss.aop.classpath"/>
+ <path refid="trove.trove.classpath"/>
+ <path refid="javassist.classpath"/>
+ <path refid="jboss.jbossxb.classpath"/>
+ <path refid="apache.xerces.classpath"/>
+ <path refid="apache.logging.classpath"/>
+ </path>
+ <path id="emma.unit.test.execution.classpath">
+ <pathelement location="${test.dir}/config"/>
+ <pathelement location="${test.dir}/tmpfiles"/>
+ <pathelement location="${test.classes.dir}"/>
+ <pathelement location="${src.config.dir}"/>
+ <pathelement location="${src.schemas.dir}"/>
+ <path refid="test.compilation.classpath"/>
+ <path refid="oswego.concurrent.classpath"/>
+ <path refid="apache.log4j.classpath"/>
+ <path refid="cglib.classpath"/>
+ <path refid="jboss.common.core.classpath"/>
+ <path refid="jboss.aop.classpath"/>
+ <path refid="trove.trove.classpath"/>
+ <path refid="javassist.classpath"/>
+ <path refid="jboss.jbossxb.classpath"/>
+ <path refid="apache.xerces.classpath"/>
+ <path refid="apache.logging.classpath"/>
+ </path>
+
<path id="jms.test.execution.classpath">
<pathelement location="${test.dir}/config"/>
<pathelement location="${src.config.dir}"/>
@@ -1372,7 +1391,7 @@
<path id="emma.execution.classpath">
<path refid="emma.lib"/>
- <path refid="unit.test.execution.classpath"/>
+ <path refid="emma.unit.test.execution.classpath"/>
</path>
<echo message=""/>
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -1560,6 +1560,7 @@
if (!msgs.contains(msg))
{
store.addSize(-msg.getMemoryEstimate());
+ msg.decrementRefCount();
}
msgs.add(msg);
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -195,7 +195,7 @@
}
}
- if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
+ if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED) )
{
storageManager.commit(id);
}
@@ -336,7 +336,7 @@
private void doRollback() throws Exception
{
- if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || xid != null)
+ if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED))
{
storageManager.rollback(id);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -29,6 +29,9 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientFileMessage;
@@ -77,7 +80,8 @@
// Protected -----------------------------------------------------
- protected void testChunks(final boolean realFiles,
+ protected void testChunks(final boolean isXA,
+ final boolean realFiles,
final boolean useFile,
final boolean preAck,
final boolean sendingBlocking,
@@ -87,7 +91,8 @@
final int waitOnConsumer,
final long delayDelivery) throws Exception
{
- testChunks(realFiles,
+ testChunks(isXA,
+ realFiles,
useFile,
preAck,
sendingBlocking,
@@ -102,7 +107,8 @@
false);
}
- protected void testChunks(final boolean realFiles,
+ protected void testChunks(final boolean isXA,
+ final boolean realFiles,
final boolean useFile,
final boolean preAck,
final boolean sendingBlocking,
@@ -116,7 +122,6 @@
final int minSizeConsumer,
final boolean testTime) throws Exception
{
-
clearData();
messagingService = createService(realFiles);
@@ -139,67 +144,51 @@
}
sf.setMinLargeMessageSize(minSizeProducer);
+
- ClientSession session = sf.createSession(null, null, false, true, false, preAck, 0);
+ ClientSession session;
+ Xid xid = null;
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
session.createQueue(ADDRESS, ADDRESS, null, true, false);
ClientProducer producer = session.createProducer(ADDRESS);
- if (useFile)
+ sendMessages(useFile, numberOfMessages, numberOfIntegers, delayDelivery, testTime, session, producer);
+
+ if (isXA)
{
- File tmpData = createLargeFile(getTemporaryDir(), "someFile.dat", numberOfIntegers);
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage message = session.createFileMessage(true);
- ((ClientFileMessage)message).setFile(tmpData);
- message.putIntProperty(new SimpleString("counter-message"), i);
- long timeStart = System.currentTimeMillis();
- if (delayDelivery > 0)
- {
- long time = System.currentTimeMillis();
- message.putLongProperty(new SimpleString("original-time"), time);
- message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+ validateNoFilesOnLargeDir();
- producer.send(message);
- }
- else
- {
- producer.send(message);
- }
+ sendMessages(useFile, numberOfMessages, numberOfIntegers, delayDelivery, testTime, session, producer);
- if (testTime)
- {
- System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
- }
- }
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
}
else
{
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage message = session.createClientMessage(true);
- message.putIntProperty(new SimpleString("counter-message"), i);
- message.setBody(createLargeBuffer(numberOfIntegers));
- long timeStart = System.currentTimeMillis();
- if (delayDelivery > 0)
- {
- long time = System.currentTimeMillis();
- message.putLongProperty(new SimpleString("original-time"), time);
- message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
-
- producer.send(message);
- }
- else
- {
- producer.send(message);
- }
- if (testTime)
- {
- System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
- }
- }
+ session.commit();
}
session.close();
@@ -217,7 +206,13 @@
sf.setMinLargeMessageSize(minSizeConsumer);
- session = sf.createSession(null, null, false, true, true, preAck, 0);
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
ClientConsumer consumer = null;
@@ -271,6 +266,19 @@
message.acknowledge();
}
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+
assertNotNull(message);
if (delayDelivery <= 0)
@@ -302,7 +310,17 @@
if (iteration == 0)
{
consumer.close();
- session.rollback();
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
}
}
@@ -330,6 +348,85 @@
}
}
+ /**
+ * @param useFile
+ * @param numberOfMessages
+ * @param numberOfIntegers
+ * @param delayDelivery
+ * @param testTime
+ * @param session
+ * @param producer
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws MessagingException
+ */
+ private void sendMessages(final boolean useFile,
+ final int numberOfMessages,
+ final int numberOfIntegers,
+ final long delayDelivery,
+ final boolean testTime,
+ ClientSession session,
+ ClientProducer producer) throws FileNotFoundException, IOException, MessagingException
+ {
+ if (useFile)
+ {
+ File tmpData = createLargeFile(getTemporaryDir(), "someFile.dat", numberOfIntegers);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createFileMessage(true);
+ ((ClientFileMessage)message).setFile(tmpData);
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ long timeStart = System.currentTimeMillis();
+ if (delayDelivery > 0)
+ {
+ long time = System.currentTimeMillis();
+ message.putLongProperty(new SimpleString("original-time"), time);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+ producer.send(message);
+ }
+ else
+ {
+ producer.send(message);
+ }
+ if (testTime)
+ {
+ System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+ }
+ }
+
+
+ }
+ else
+ {
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ message.setBody(createLargeBuffer(numberOfIntegers));
+ long timeStart = System.currentTimeMillis();
+ if (delayDelivery > 0)
+ {
+ long time = System.currentTimeMillis();
+ message.putLongProperty(new SimpleString("original-time"), time);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+ producer.send(message);
+ }
+ else
+ {
+ producer.send(message);
+ }
+
+ if (testTime)
+ {
+ System.out.println("Message sent in " + (System.currentTimeMillis() - timeStart));
+ }
+ }
+ }
+ }
+
protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
{
MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
@@ -345,9 +442,14 @@
protected ClientFileMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers) throws Exception
{
+ return createLargeClientMessage(session, numberOfIntegers, true);
+ }
- ClientFileMessage clientMessage = session.createFileMessage(true);
+ protected ClientFileMessage createLargeClientMessage(final ClientSession session, final int numberOfIntegers, boolean persistent) throws Exception
+ {
+ ClientFileMessage clientMessage = session.createFileMessage(persistent);
+
File tmpFile = createLargeFile(getTemporaryDir(), "tmpUpload.data", numberOfIntegers);
clientMessage.setFile(tmpFile);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -26,6 +26,9 @@
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import junit.framework.AssertionFailedError;
import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -284,34 +287,64 @@
public void testMessageChunkFilePersistence() throws Exception
{
- testChunks(true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
}
+ public void testMessageChunkFilePersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testMessageChunkFilePersistenceBlocked() throws Exception
{
- testChunks(true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
}
+ public void testMessageChunkFilePersistenceBlockedXA() throws Exception
+ {
+ testChunks(true, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testMessageChunkFilePersistenceBlockedPreACK() throws Exception
{
- testChunks(true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
}
+ public void testMessageChunkFilePersistenceBlockedPreACKXA() throws Exception
+ {
+ testChunks(true, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testMessageChunkFilePersistenceDelayed() throws Exception
{
- testChunks(true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
+ testChunks(false, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
}
+ public void testMessageChunkFilePersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
+ }
+
public void testMessageChunkNullPersistence() throws Exception
{
- testChunks(false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
}
+ public void testMessageChunkNullPersistenceXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testMessageChunkNullPersistenceDelayed() throws Exception
{
- testChunks(false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
+ testChunks(false, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
}
+ public void testMessageChunkNullPersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
+ }
+
public void testPageOnLargeMessage() throws Exception
{
testPageOnLargeMessage(true, false);
@@ -325,46 +358,85 @@
public void testSendfileMessage() throws Exception
{
- testChunks(true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageXA() throws Exception
+ {
+ testChunks(true, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
}
public void testSendfileMessageOnNullPersistence() throws Exception
{
- testChunks(false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
}
+ public void testSendfileMessageOnNullPersistenceXA() throws Exception
+ {
+ testChunks(true, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
{
- testChunks(false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
}
+ public void testSendfileMessageOnNullPersistenceSmallMessageXA() throws Exception
+ {
+ testChunks(true, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testSendfileMessageSmallMessage() throws Exception
{
- testChunks(true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
+ }
+ public void testSendfileMessageSmallMessageXA() throws Exception
+ {
+ testChunks(true, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
}
public void testSendRegularMessageNullPersistence() throws Exception
{
- testChunks(false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
}
+ public void testSendRegularMessageNullPersistenceXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testSendRegularMessageNullPersistenceDelayed() throws Exception
{
- testChunks(false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ testChunks(false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
}
+ public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
public void testSendRegularMessagePersistence() throws Exception
{
- testChunks(true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ testChunks(false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
}
+ public void testSendRegularMessagePersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
public void testSendRegularMessagePersistenceDelayed() throws Exception
{
- testChunks(true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
}
+ public void testSendRegularMessagePersistenceDelayedXA() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
public void testTwoBindingsTwoStartedConsumers() throws Exception
{
// there are two bindings.. one is ACKed, the other is not, the server is restarted
@@ -521,9 +593,77 @@
}
}
+
+ public void testSendRollback() throws Exception
+ {
+ clearData();
+
+ boolean isXA = false;
+
+ messagingService = createService(true);
+ messagingService.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(isXA, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ Xid xid = null;
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+
+ Message clientFile = createLargeClientMessage(session, 50000, false);
+
+ for (int i = 0; i < 1; i++)
+ {
+ producer.send(clientFile);
+ }
+
+
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+
+ messagingService.stop();
+
+ }
+
+
public void testSimpleRollback() throws Exception
{
+ simpleRollbackInternalTest(false);
+ }
+
+ public void testSimpleRollbackXA() throws Exception
+ {
+ simpleRollbackInternalTest(true);
+ }
+
+
+ public void simpleRollbackInternalTest(boolean isXA) throws Exception
+ {
// there are two bindings.. one is ACKed, the other is not, the server is restarted
// The other binding is acked... The file must be deleted
@@ -538,14 +678,20 @@
ClientSessionFactory sf = createInVMFactory();
- ClientSession session = sf.createSession(false, false, false);
+ ClientSession session = sf.createSession(isXA, false, false);
+
+ Xid xid = null;
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
session.createQueue(ADDRESS, ADDRESS, null, true, false);
- int numberOfIntegers = 100;
+ int numberOfIntegers = 50000;
- Message clientFile = createLargeClientMessage(session, numberOfIntegers);
-
session.start();
log.info ("Session started");
@@ -556,12 +702,40 @@
for (int n = 0; n < 10; n++)
{
+ Message clientFile = createLargeClientMessage(session, numberOfIntegers, n%2 == 0);
+
producer.send(clientFile);
assertNull(consumer.receiveImmediate());
- session.commit();
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ producer.send(clientFile);
+
+ assertNull(consumer.receiveImmediate());
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
for (int i = 0; i < 2; i++)
{
@@ -573,13 +747,34 @@
clientMessage.acknowledge();
- if (i == 0)
+ if (isXA)
{
- session.rollback();
+ if (i == 0)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
}
else
{
- session.commit();
+ if (i == 0)
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
}
}
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -0,0 +1,292 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A MultiThreadFailoverSupport
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Mar 17, 2009 11:15:02 AM
+ *
+ *
+ */
+public abstract class MultiThreadFailoverSupport extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Attributes ----------------------------------------------------
+
+ protected Timer timer;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected abstract void start() throws Exception;
+
+ protected abstract void stop() throws Exception;
+
+ protected abstract ClientSessionFactoryInternal createSessionFactory();
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ timer = new Timer();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ timer.cancel();
+ super.tearDown();
+ }
+
+ protected boolean shouldFail()
+ {
+ return true;
+ }
+
+
+
+ protected void runMultipleThreadsFailoverTest(final RunnableT runnable,
+ final int numThreads,
+ final int numIts,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+ for (int its = 0; its < numIts; its++)
+ {
+ log.info("************ ITERATION: " + its);
+
+ start();
+
+ final ClientSessionFactoryInternal sf = createSessionFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ Failer failer = startFailer(failDelay, session, failOnCreateConnection);
+
+ class Runner extends Thread
+ {
+ private volatile Throwable throwable;
+
+ private final RunnableT test;
+
+ private final int threadNum;
+
+ Runner(final RunnableT test, final int threadNum)
+ {
+ this.test = test;
+
+ this.threadNum = threadNum;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ test.run(sf, threadNum);
+ }
+ catch (Throwable t)
+ {
+ throwable = t;
+
+ log.error("Failed to run test", t);
+
+ // Case a failure happened here, it should print the Thread dump
+ // Sending it to System.out, as it would show on the Tests report
+ System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads (" + t.getLocalizedMessage() + ")"));
+ }
+ }
+ }
+
+ do
+ {
+ List<Runner> threads = new ArrayList<Runner>();
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ Runner runner = new Runner(runnable, i);
+
+ threads.add(runner);
+
+ runner.start();
+ }
+
+ for (Runner thread : threads)
+ {
+ thread.join();
+
+ if (thread.throwable != null)
+ {
+ throw new Exception("Exception on thread " + thread, thread.throwable);
+ }
+ }
+
+ log.info("completed loop");
+
+ runnable.checkFail();
+
+ }
+ while (!failer.isExecuted());
+
+ InVMConnector.resetFailures();
+
+ log.info("closing session");
+ session.close();
+ log.info("closed session");
+
+ assertEquals(0, sf.numSessions());
+
+ assertEquals(0, sf.numConnections());
+
+ log.info("stopping");
+ stop();
+ log.info("stopped");
+ }
+ }
+
+
+ // Private -------------------------------------------------------
+
+ private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
+ {
+ Failer failer = new Failer(session, failOnCreateConnection);
+
+ // This is useful for debugging.. just change shouldFail to return false, and Failer will not be executed
+ if (shouldFail())
+ {
+ timer.schedule(failer, (long)(time * Math.random()), 100);
+ }
+
+ return failer;
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+
+ protected abstract class RunnableT extends Thread
+ {
+ private volatile String failReason;
+
+ private volatile Throwable throwable;
+
+ public void setFailed(final String reason, final Throwable throwable)
+ {
+ failReason = reason;
+ this.throwable = throwable;
+ }
+
+ public void checkFail()
+ {
+ if (throwable != null)
+ {
+ log.error("Test failed: " + failReason, throwable);
+ }
+ if (failReason != null)
+ {
+ fail(failReason);
+ }
+ }
+
+ public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
+ }
+
+
+
+ private class Failer extends TimerTask
+ {
+ private final ClientSession session;
+
+ private boolean executed;
+
+ private final boolean failOnCreateConnection;
+
+ public Failer(final ClientSession session, final boolean failOnCreateConnection)
+ {
+ this.session = session;
+
+ this.failOnCreateConnection = failOnCreateConnection;
+ }
+
+ @Override
+ public synchronized void run()
+ {
+ log.info("** Failing connection");
+
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+ if (failOnCreateConnection)
+ {
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ }
+ else
+ {
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+ }
+
+ log.info("** Fail complete");
+
+ cancel();
+
+ executed = true;
+ }
+
+ public synchronized boolean isExecuted()
+ {
+ log.info("executed??" + executed);
+ return executed;
+ }
+ }
+
+
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -22,14 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.failover;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -41,17 +38,13 @@
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.jms.client.JBossBytesMessage;
import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -62,7 +55,7 @@
*
*
*/
-public abstract class MultiThreadRandomFailoverTestBase extends UnitTestCase
+public abstract class MultiThreadRandomFailoverTestBase extends MultiThreadFailoverSupport
{
private final Logger log = Logger.getLogger(getClass());
@@ -73,7 +66,7 @@
private final int LATCH_WAIT = getLatchWait();
- private static final int NUM_THREADS = 10;
+ private int NUM_THREADS = getNumThreads();
// Attributes ----------------------------------------------------
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
@@ -233,7 +226,7 @@
{
doTestL(sf);
}
- }, NUM_THREADS, false, true, 10);
+ }, NUM_THREADS, true, 10);
}
// public void testM() throws Exception
@@ -268,6 +261,11 @@
protected abstract void setBody(ClientMessage message) throws Exception;
protected abstract boolean checkSize(ClientMessage message);
+
+ protected int getNumThreads()
+ {
+ return 10;
+ }
protected ClientSession createAutoCommitSession(ClientSessionFactory sf) throws Exception
{
@@ -1333,128 +1331,22 @@
// Private -------------------------------------------------------
- private void runTestMultipleThreads(final RunnableT runnable, final int numThreads, final boolean fileBased) throws Exception
- {
- runTestMultipleThreads(runnable, numThreads, fileBased, false);
- }
-
private void runTestMultipleThreads(final RunnableT runnable,
final int numThreads,
- final boolean fileBased,
final boolean failOnCreateConnection) throws Exception
{
- this.runTestMultipleThreads(runnable, numThreads, fileBased, failOnCreateConnection, 1000);
+ this.runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
}
private void runTestMultipleThreads(final RunnableT runnable,
final int numThreads,
- final boolean fileBased,
final boolean failOnCreateConnection,
final long failDelay) throws Exception
{
- final int numIts = getNumIterations();
-
- for (int its = 0; its < numIts; its++)
- {
- log.info("************ ITERATION: " + its);
-
- start();
-
- final ClientSessionFactoryInternal sf = createSessionFactory();
-
- final ClientSession session = sf.createSession(false, true, true);
-
- Failer failer = startFailer(failDelay, session, failOnCreateConnection);
-
- class Runner extends Thread
- {
- private volatile Throwable throwable;
-
- private final RunnableT test;
-
- private final int threadNum;
-
- Runner(final RunnableT test, final int threadNum)
- {
- this.test = test;
-
- this.threadNum = threadNum;
- }
-
- @Override
- public void run()
- {
- try
- {
- test.run(sf, threadNum);
- }
- catch (Throwable t)
- {
- throwable = t;
- // Case a failure happened here, it should print the Thread dump
- // Sending it to System.out, as it would show on the Tests report
- System.out.println(threadDump(" - fired by MultiThreadRandomFailoverTestBase::runTestMultipleThreads"));
-
- log.error("Failed to run test", t);
- }
- }
- }
-
- do
- {
- List<Runner> threads = new ArrayList<Runner>();
-
- for (int i = 0; i < numThreads; i++)
- {
- Runner runner = new Runner(runnable, i);
-
- threads.add(runner);
-
- runner.start();
- }
-
- for (Runner thread : threads)
- {
- thread.join();
-
- if (thread.throwable != null)
- {
- throw new Exception("Exception on thread " + thread, thread.throwable);
- }
- }
-
- log.info("completed loop");
-
- runnable.checkFail();
-
- }
- while (!failer.isExecuted());
-
- InVMConnector.resetFailures();
-
- log.info("closing session");
- session.close();
- log.info("closed session");
-
- assertEquals(0, sf.numSessions());
-
- assertEquals(0, sf.numConnections());
-
- log.info("stopping");
- stop();
- log.info("stopped");
- }
+
+ runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(), failOnCreateConnection, failDelay);
}
- private Failer startFailer(final long time, final ClientSession session, final boolean failOnCreateConnection)
- {
- Failer failer = new Failer(session, failOnCreateConnection);
-
- timer.schedule(failer, (long)(time * Math.random()), 100);
-
- return failer;
- }
-
/**
* @return
*/
@@ -1472,7 +1364,7 @@
return sf;
}
- private void stop() throws Exception
+ protected void stop() throws Exception
{
backupService.stop();
@@ -1552,79 +1444,6 @@
// Inner classes -------------------------------------------------
- private class Failer extends TimerTask
- {
- private final ClientSession session;
-
- private boolean executed;
-
- private final boolean failOnCreateConnection;
-
- public Failer(final ClientSession session, final boolean failOnCreateConnection)
- {
- this.session = session;
-
- this.failOnCreateConnection = failOnCreateConnection;
- }
-
- @Override
- public synchronized void run()
- {
- log.info("** Failing connection");
-
- RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
-
- if (failOnCreateConnection)
- {
- InVMConnector.numberOfFailures = 1;
- InVMConnector.failOnCreateConnection = true;
- }
- else
- {
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
- }
-
- log.info("** Fail complete");
-
- cancel();
-
- executed = true;
- }
-
- public synchronized boolean isExecuted()
- {
- log.info("executed??" + executed);
- return executed;
- }
- }
-
- private abstract class RunnableT extends Thread
- {
- private volatile String failReason;
-
- private volatile Throwable throwable;
-
- public void setFailed(final String reason, final Throwable throwable)
- {
- failReason = reason;
- this.throwable = throwable;
- }
-
- public void checkFail()
- {
- if (throwable != null)
- {
- log.error("Test failed: " + failReason, throwable);
- }
- if (failReason != null)
- {
- fail(failReason);
- }
- }
-
- public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
- }
-
private class MyHandler implements MessageHandler
{
CountDownLatch latch = new CountDownLatch(1);
@@ -1647,7 +1466,7 @@
failure = null;
- latch = new CountDownLatch(1);;
+ latch = new CountDownLatch(1);
}
MyHandler(final int threadNum, final int numMessages)
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -0,0 +1,150 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.Messaging;
+
+/**
+ * A LargeMessageMultiThreadFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Jan 18, 2009 4:52:09 PM
+ *
+ *
+ */
+public class XALargeMessageMultiThreadFailoverTest extends XAMultiThreadRandomFailoverTest
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private final byte[] BODY = new byte[500];
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ @Override
+ protected ClientSessionFactoryInternal createSessionFactory()
+ {
+ ClientSessionFactoryInternal sf = super.createSessionFactory();
+ sf.setMinLargeMessageSize(200);
+ return sf;
+
+ }
+
+ @Override
+ protected void start() throws Exception
+ {
+
+ deleteDirectory(new File(getTestDir()));
+
+ Configuration backupConf = new ConfigurationImpl();
+
+ backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+ backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+ backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+ backupConf.setJournalType(JournalType.NIO);
+
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+ backupConf.setBackup(true);
+
+ backupService = Messaging.newMessagingService(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+
+ liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+ liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+ liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+ liveConf.setJournalFileSize(100 * 1024);
+
+ liveConf.setJournalType(JournalType.NIO);
+
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+ liveService = Messaging.newMessagingService(liveConf);
+
+ liveService.start();
+
+ }
+
+ @Override
+ protected void setBody(final ClientMessage message) throws Exception
+ {
+
+ message.getBody().writeBytes(BODY);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+ */
+ @Override
+ protected boolean checkSize(final ClientMessage message)
+ {
+ return BODY.length == message.getBodySize();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XAMultiThreadRandomFailoverTest.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -0,0 +1,896 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors by
+ * the @authors tag. See the copyright.txt in the distribution for a full listing of individual contributors. This is
+ * free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version.
+ * This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details. You should have received a copy of the GNU Lesser General Public License along with this software; if not,
+ * write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A MultiThreadRandomFailoverStressTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ */
+public class XAMultiThreadRandomFailoverTest extends MultiThreadFailoverSupport
+{
+ protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ private static final int RECEIVE_TIMEOUT = 30000;
+
+ private final Logger log = Logger.getLogger(getClass());
+
+ protected MessagingService liveService;
+
+ protected MessagingService backupService;
+
+ protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ protected Map<ClientSession, Xid> xids;
+
+ private int NUM_THREADS = getNumThreads();
+
+ private final int LATCH_WAIT = getLatchWait();
+
+ private final int NUM_SESSIONS = getNumSessions();
+
+ protected int getNumSessions()
+ {
+ return 10;
+ }
+
+ protected int getLatchWait()
+ {
+ return 20000;
+ }
+
+ protected int getNumThreads()
+ {
+ return 10;
+ }
+
+ protected int getNumIterations()
+ {
+ return 2;
+ }
+
+ protected boolean shouldFail()
+ {
+ return true;
+ }
+
+ protected ClientSession createTransactionalSession(ClientSessionFactory sf) throws Exception
+ {
+ ClientSession sess = sf.createSession(true, false, false);
+ return sess;
+ }
+
+ public void testC() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestC(sf, threadNum);
+ }
+ }, NUM_THREADS, false, 3000);
+ }
+
+ public void testD() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestD(sf, threadNum);
+ }
+ }, NUM_THREADS, false, 3000);
+ }
+
+ public void testG() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestG(sf, threadNum);
+ }
+ }, NUM_THREADS, false, 3000);
+ }
+
+ public void testH() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestH(sf, threadNum);
+ }
+ }, NUM_THREADS, false, 3000);
+ }
+
+ protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = NUM_SESSIONS;
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = createTransactionalSession(sf);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ MyHandler handler = new MyHandler(threadNum, numMessages, sessConsume, consumer);
+
+ handler.setCommitOnComplete(false);
+
+ handler.start();
+
+ handlers.add(handler);
+ }
+
+ ClientSession sessSend = createTransactionalSession(sf);
+
+ transactionallySendMessages(threadNum, numMessages, sessSend);
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ handler.setCommitOnComplete(true);
+ handler.start();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+ }
+
+ sessSend.close();
+
+ for (MyHandler handler : handlers)
+ {
+ handler.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ for (MyHandler handler : handlers)
+ {
+ if (handler.failure != null)
+ {
+ fail(handler.failure);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestD(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ ClientSession sessConsume = createTransactionalSession(sf);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ MyHandler handler = new MyHandler(threadNum, numMessages, sessConsume, consumer);
+
+ handlers.add(handler);
+ }
+
+ ClientSession sessSend = createTransactionalSession(sf);
+
+ transactionallySendMessages(threadNum, numMessages, sessSend);
+
+ for (MyHandler handler : handlers)
+ {
+ handler.session.start();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ handler.start();
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ Set<MyHandler> newhandlers = new HashSet<MyHandler>();
+
+ for (MyHandler handler : handlers)
+ {
+ MyHandler newHandler = new MyHandler(threadNum, numMessages, handler.session, handler.consumer);
+ newHandler.setCommitOnComplete(true);
+ newHandler.start();
+ newhandlers.add(newHandler);
+ }
+
+ handlers.clear();
+
+ handlers = newhandlers;
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed on rollback: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+
+ for (MyHandler handler : handlers)
+ {
+ handler.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + " sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestG(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = getNumSessions();
+
+ Set<MyInfo> myinfos = new HashSet<MyInfo>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(true, false, false);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ Xid xid = newXID();
+
+ sessConsume.start(xid, XAResource.TMNOFLAGS);
+
+ myinfos.add(new MyInfo(sessConsume, consumer, xid));
+ }
+
+ ClientSession sessSend = sf.createSession(true, false, false);
+
+ transactionallySendMessages(threadNum, numMessages, sessSend);
+ consumeMessages(myinfos, numMessages, threadNum);
+
+ for (MyInfo info : myinfos)
+ {
+ info.session.end(info.xid, XAResource.TMSUCCESS);
+ info.session.prepare(info.xid);
+ info.session.rollback(info.xid);
+ info.xid = newXID();
+ info.session.start(info.xid, XAResource.TMNOFLAGS);
+ }
+
+ consumeMessages(myinfos, numMessages, threadNum);
+
+ for (MyInfo info : myinfos)
+ {
+ info.session.end(info.xid, XAResource.TMSUCCESS);
+ info.session.prepare(info.xid);
+ info.session.commit(info.xid, false);
+ info.xid = null;
+ }
+
+ sessSend.close();
+ for (MyInfo info : myinfos)
+ {
+ info.session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ protected void doTestH(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<MyInfo> myinfos = new HashSet<MyInfo>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(true, false, false);
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ Xid xid = newXID();
+
+ sessConsume.start(xid, XAResource.TMNOFLAGS);
+
+ myinfos.add(new MyInfo(sessConsume, consumer, xid));
+ }
+
+ ClientSession sessSend = sf.createSession(true, false, false);
+
+ transactionallySendMessages(threadNum, numMessages, sessSend);
+
+ for (MyInfo info : myinfos)
+ {
+ info.session.start();
+ }
+
+ consumeMessages(myinfos, numMessages, threadNum);
+
+ for (MyInfo info : myinfos)
+ {
+ info.session.end(info.xid, XAResource.TMSUCCESS);
+ info.session.prepare(info.xid);
+ info.session.rollback(info.xid);
+ info.xid = newXID();
+ info.session.start(info.xid, XAResource.TMNOFLAGS);
+ }
+
+ consumeMessages(myinfos, numMessages, threadNum);
+
+ for (MyInfo info : myinfos)
+ {
+ info.session.end(info.xid, XAResource.TMSUCCESS);
+ info.session.prepare(info.xid);
+ info.session.commit(info.xid, false);
+ info.xid = null;
+ }
+
+ sessSend.close();
+ for (MyInfo info : myinfos)
+ {
+ info.session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
+ /**
+ * @param threadNum
+ * @param numMessages
+ * @param sessSend
+ * @throws XAException
+ * @throws MessagingException
+ * @throws Exception
+ */
+ private void transactionallySendMessages(final int threadNum, final int numMessages, ClientSession sessSend) throws XAException,
+ MessagingException,
+ Exception
+ {
+ Xid xid = newXID();
+ sessSend.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.end(xid, XAResource.TMSUCCESS);
+ sessSend.rollback(xid);
+
+ xid = newXID();
+ sessSend.start(xid, XAResource.TMNOFLAGS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ sessSend.end(xid, XAResource.TMSUSPEND);
+
+ sessSend.start(xid, XAResource.TMRESUME);
+
+ sessSend.end(xid, XAResource.TMSUCCESS);
+
+ sessSend.commit(xid, true);
+ }
+
+ private void consumeMessages(final Set<MyInfo> myinfos, final int numMessages, final int threadNum) throws Exception
+ {
+ // We make sure the messages arrive in the order they were sent from a particular producer
+ Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (MyInfo myinfo : myinfos)
+ {
+ Map<Integer, Integer> consumerCounts = counts.get(myinfo);
+
+ if (consumerCounts == null)
+ {
+ consumerCounts = new HashMap<Integer, Integer>();
+ counts.put(myinfo.consumer, consumerCounts);
+ }
+
+ ClientMessage msg = myinfo.consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+
+ Integer c = consumerCounts.get(tn);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
+ }
+
+ c++;
+
+ // Wrap
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ consumerCounts.put(tn, c);
+
+ msg.acknowledge();
+ }
+ }
+ }
+
+ @Override
+ protected void start() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = Messaging.newNullStorageMessagingService(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+ liveService = Messaging.newNullStorageMessagingService(liveConf);
+ liveService.start();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadFailoverSupport#stop()
+ */
+ @Override
+ protected void stop() throws Exception
+ {
+ backupService.stop();
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ @Override
+ protected ClientSessionFactoryInternal createSessionFactory()
+ {
+ final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams),
+ 0,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
+
+ sf.setSendWindowSize(32 * 1024);
+ return sf;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#setBody(org.jboss.messaging.core.client.ClientMessage)
+ */
+ protected void setBody(final ClientMessage message) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+ */
+ protected boolean checkSize(final ClientMessage message)
+ {
+ return 0 == message.getBody().writerIndex();
+ }
+
+ private void runTestMultipleThreads(final RunnableT runnable,
+ final int numThreads,
+ final boolean failOnCreateConnection,
+ final long failDelay) throws Exception
+ {
+
+ runMultipleThreadsFailoverTest(runnable, numThreads, getNumIterations(), failOnCreateConnection, failDelay);
+ }
+
+ private void sendMessages(final ClientSession sessSend,
+ final ClientProducer producer,
+ final int numMessages,
+ final int threadNum) throws Exception
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("threadnum"), threadNum);
+ message.putIntProperty(new SimpleString("count"), i);
+ setBody(message);
+ producer.send(message);
+ }
+ }
+
+ private class MyInfo
+ {
+ final ClientSession session;
+
+ Xid xid;
+
+ final ClientConsumer consumer;
+
+ public MyInfo(final ClientSession session, final ClientConsumer consumer, final Xid xid)
+ {
+ this.session = session;
+ this.consumer = consumer;
+ this.xid = xid;
+ }
+ }
+
+ private class MyHandler implements MessageHandler
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ private final Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+ volatile String failure;
+
+ final int tn;
+
+ final int numMessages;
+
+ final ClientSession session;
+
+ final ClientConsumer consumer;
+
+ volatile Xid xid;
+
+ volatile boolean done;
+
+ volatile boolean started = false;
+
+ volatile boolean commit = false;
+
+ synchronized void start() throws Exception
+ {
+ counts.clear();
+
+ done = false;
+
+ failure = null;
+
+ latch = new CountDownLatch(1);
+
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ started = true;
+ consumer.setMessageHandler(this);
+ session.start();
+ }
+
+ synchronized void stop() throws Exception
+ {
+ session.stop();
+ // FIXME: Remove this line when https://jira.jboss.org/jira/browse/JBMESSAGING-1549 is done
+ consumer.setMessageHandler(null);
+ started = false;
+ }
+
+ synchronized void close() throws Exception
+ {
+ stop();
+ session.close();
+ }
+
+ private synchronized void rollback()
+ {
+ try
+ {
+ stop();
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ }
+ catch (Exception e)
+ {
+ this.failure = e.getLocalizedMessage();
+ }
+ }
+
+ private synchronized void commit()
+ {
+ try
+ {
+ stop();
+
+ // Suspend & resume... just exercising the API as part of the test
+ session.end(xid, XAResource.TMSUSPEND);
+ session.start(xid, XAResource.TMRESUME);
+
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.commit(xid, false);
+ }
+ catch (Exception e)
+ {
+ this.failure = e.getLocalizedMessage();
+ }
+ }
+
+ MyHandler(final int threadNum, final int numMessages, final ClientSession session, final ClientConsumer consumer) throws Exception
+ {
+ tn = threadNum;
+
+ this.numMessages = numMessages;
+
+ this.session = session;
+
+ this.consumer = consumer;
+
+ }
+
+ public void setCommitOnComplete(boolean commit)
+ {
+ this.commit = commit;
+ }
+
+ public synchronized void onMessage(final ClientMessage message)
+ {
+
+ if (!started)
+ {
+ this.failure = "Received message with session stopped (thread = " + tn + ")";
+ log.error(failure);
+ return;
+ }
+
+ // log.info("*** handler got message");
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException me)
+ {
+ log.error("Failed to process", me);
+ }
+
+ if (done)
+ {
+ return;
+ }
+
+ int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)message.getProperty(new SimpleString("count"));
+
+ Integer c = counts.get(threadNum);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
+ log.error(failure);
+
+ latch.countDown();
+ }
+
+ if (!checkSize(message))
+ {
+ failure = "Invalid size on message";
+ log.error(failure);
+ latch.countDown();
+ }
+
+ if (tn == threadNum && c == numMessages - 1)
+ {
+ done = true;
+ if (commit)
+ {
+ commit();
+ }
+ else
+ {
+ rollback();
+ }
+ latch.countDown();
+ }
+
+ c++;
+ // Wrap around at numMessages
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ counts.put(threadNum, c);
+
+ }
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -326,7 +326,7 @@
session.close();
}
-
+
public void testEmptyXID() throws Exception
{
Xid xid = newXID();
@@ -336,28 +336,55 @@
session.rollback(xid);
session.close();
-
+
messagingService.stop();
+
+ // do the same test with a file persistence now
+ messagingService = createService(true, configuration, addressSettings);
+
+ messagingService.start();
+
+ sessionFactory = createInVMFactory();
+
+ xid = newXID();
+ session = sessionFactory.createSession(true, false, false);
+ session.start(xid, XAResource.TMNOFLAGS);
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.commit(xid, false);
+
+
+ xid = newXID();
+ session = sessionFactory.createSession(true, false, false);
+ session.start(xid, XAResource.TMNOFLAGS);
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+
- // Enable this when https://jira.jboss.org/jira/browse/JBMESSAGING-1548 is done
-
-// // do the same test with a file persistence now
-// messagingService = createService(true, configuration, addressSettings);
-//
-// messagingService.start();
-//
-// sessionFactory = createInVMFactory();
-//
-// xid = newXID();
-// session = sessionFactory.createSession(true, false, false);
-// session.start(xid, XAResource.TMNOFLAGS);
-// session.end(xid, XAResource.TMSUCCESS);
-// session.rollback(xid);
-
+ session.close();
+ messagingService.stop();
+ messagingService.start();
+
+ // This is not really necessary... But since the server has stopped, I would prefer to keep recreating the factory
+ sessionFactory = createInVMFactory();
+
+ session = sessionFactory.createSession(true, false, false);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+
+ assertEquals(0, xids.length);
+
+ session.close();
+
}
-
public void testForget() throws Exception
{
clientSession.forget(newXID());
Modified: trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java 2009-03-18 17:32:19 UTC (rev 6109)
+++ trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java 2009-03-18 21:49:31 UTC (rev 6110)
@@ -49,8 +49,9 @@
public void testMessageChunkFilePersistence1G() throws Exception
{
- testChunks(true,
+ testChunks(false,
true,
+ true,
false,
true,
false,
More information about the jboss-cvs-commits
mailing list