[jboss-cvs] JBoss Messaging SVN: r6776 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 13 22:14:30 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-13 22:14:30 -0400 (Wed, 13 May 2009)
New Revision: 6776
Modified:
trunk/docs/user-manual/en/modules/large-messages.xml
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1625 - XARollback and ordering on backup
Modified: trunk/docs/user-manual/en/modules/large-messages.xml
===================================================================
--- trunk/docs/user-manual/en/modules/large-messages.xml 2009-05-14 01:47:11 UTC (rev 6775)
+++ trunk/docs/user-manual/en/modules/large-messages.xml 2009-05-14 02:14:30 UTC (rev 6776)
@@ -33,8 +33,7 @@
class</para>
<programlisting> ClientSessionFactory factory = ....;
factory.setMinLargeMessageSize(25000);</programlisting>
- <para>The default value is aways 100K which is a good value for most cases based on our
- tests.</para>
+ <para>The default value is aways 100K which is a good value for most cases.</para>
</section>
<section>
<title>Streaming Large messages</title>
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-05-14 01:47:11 UTC (rev 6775)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-05-14 02:14:30 UTC (rev 6776)
@@ -511,7 +511,7 @@
}
else
{
- final HashSet<Queue> queues = lockUsedQueues();
+ final HashSet<Queue> queues = lockUsedQueues(null);
replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
{
@@ -625,17 +625,30 @@
public void handleXARollback(final SessionXARollbackMessage packet)
{
+
if (replicatingChannel == null)
{
doHandleXARollback(packet);
}
else
{
+ final Set<Queue> queues = lockUsedQueues(packet.getXid());
+
replicatingChannel.replicatePacket(packet, oppositeChannelID, new Runnable()
{
public void run()
{
- doHandleXARollback(packet);
+ try
+ {
+ doHandleXARollback(packet);
+ }
+ finally
+ {
+ for (Queue queue : queues)
+ {
+ queue.unlockDelivery();
+ }
+ }
}
});
}
@@ -865,7 +878,7 @@
}
else
{
- final HashSet<Queue> queues = lockUsedQueues();
+ final HashSet<Queue> queues = lockUsedQueues(null);
// We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
// but we need to process the actual close() when the replication response returns, otherwise things
@@ -2508,12 +2521,12 @@
postOffice.route(msg, tx);
}
}
-
+
/**
* We need to avoid delivery when rolling back while doing replication, or the backup node could be on a different order
* @return
*/
- private HashSet<Queue> lockUsedQueues()
+ private HashSet<Queue> lockUsedQueues(Xid xid)
{
final HashSet<Queue> queues = new HashSet<Queue>();
@@ -2522,11 +2535,21 @@
queues.add(consumer.getQueue());
}
- if (tx != null)
+ Transaction localTX;
+ if (xid == null)
{
- queues.addAll(tx.getDistinctQueues());
+ localTX = tx;
}
+ else
+ {
+ localTX = resourceManager.getTransaction(xid);
+ }
+ if (localTX != null)
+ {
+ queues.addAll(localTX.getDistinctQueues());
+ }
+
for (Queue queue : queues)
{
queue.lockDelivery();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java 2009-05-14 01:47:11 UTC (rev 6775)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/OrderingOnBackupTest.java 2009-05-14 02:14:30 UTC (rev 6776)
@@ -22,14 +22,15 @@
package org.jboss.messaging.tests.integration.cluster.failover;
-import java.lang.reflect.Method;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
@@ -44,13 +45,10 @@
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
import org.jboss.messaging.core.postoffice.QueueBinding;
-import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.QueueImpl;
-import org.jboss.messaging.core.server.impl.ServerConsumerImpl;
import org.jboss.messaging.tests.util.RandomUtil;
-import org.jboss.messaging.tests.util.SpawnedVMSupport;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -269,9 +267,19 @@
}
+ public void testDeliveryOrderOnTransactionalRollbackMultiThreadXA() throws Exception
+ {
+ internalTestDeliveryOrderOnTransactionalRollbackMultiThread(true);
+ }
+
public void testDeliveryOrderOnTransactionalRollbackMultiThread() throws Exception
{
+ internalTestDeliveryOrderOnTransactionalRollbackMultiThread(false);
+ }
+ public void internalTestDeliveryOrderOnTransactionalRollbackMultiThread(final boolean isXA) throws Exception
+ {
+
final SimpleString ADDRESS = new SimpleString("TEST");
final SimpleString PROPERTY_KEY = new SimpleString("KEY-STR");
@@ -314,7 +322,7 @@
msg.putStringProperty(PROPERTY_KEY, RandomUtil.randomSimpleString());
prod.send(msg);
}
-
+
sess.commit();
}
catch (Throwable e)
@@ -346,6 +354,8 @@
final CountDownLatch latchStart;
+ Xid xid = null;
+
final boolean rollback;
ConsumerThread(final ClientSessionFactory sf,
@@ -359,17 +369,43 @@
this.rollback = rollback;
}
+ public void close()
+ {
+ try
+ {
+ if (xid != null)
+ {
+ sess.rollback(xid);
+ xid = null;
+ }
+ sess.close();
+ sess = null;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+
@Override
public void run()
{
+
ClientConsumer cons = null;
try
{
latchAlign.countDown();
latchStart.await();
- sess = sf.createSession(false, false, false);
+ sess = sf.createSession(isXA, false, false);
+ if (isXA)
+ {
+ xid = newXID();
+ sess.start(xid, XAResource.TMNOFLAGS);
+ }
+
cons = sess.createConsumer(ADDRESS);
sess.start();
@@ -390,9 +426,21 @@
{
try
{
+ if (isXA)
+ {
+ sess.end(xid, XAResource.TMSUCCESS);
+ }
if (rollback)
{
- sess.rollback();
+ if (isXA)
+ {
+ sess.rollback(xid);
+ xid = null;
+ }
+ else
+ {
+ sess.rollback();
+ }
cons.close();
}
}
@@ -471,7 +519,7 @@
{
if (t.sess != null)
{
- t.sess.close();
+ t.close();
}
}
More information about the jboss-cvs-commits
mailing list