[jboss-cvs] JBoss Messaging SVN: r6079 - in trunk: src/main/org/jboss/messaging/core/server/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 13 11:57:44 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-13 11:57:44 -0400 (Fri, 13 Mar 2009)
New Revision: 6079
Modified:
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
Log:
Synchronizing some of the transaction operations...
as discussed here:
http://www.jboss.org/index.html?module=bb&op=viewtopic&t=152174
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-03-13 15:57:44 UTC (rev 6079)
@@ -842,18 +842,23 @@
private final PageMessageOperation getPageOperation(final Transaction tx)
{
- PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
-
- if (oper == null)
+ // you could have races on the case two sessions using the same XID
+ // so this whole operation needs to be atomic per TX
+ synchronized (tx)
{
- oper = new PageMessageOperation();
-
- tx.putProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION, oper);
-
- tx.addOperation(oper);
+ PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
+
+ if (oper == null)
+ {
+ oper = new PageMessageOperation();
+
+ tx.putProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION, oper);
+
+ tx.addOperation(oper);
+ }
+
+ return oper;
}
-
- return oper;
}
private class MessageExpiryRunner implements Runnable
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-03-13 15:57:44 UTC (rev 6079)
@@ -626,18 +626,21 @@
final RefsOperation getRefsOperation(final Transaction tx)
{
- RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
-
- if (oper == null)
+ synchronized (tx)
{
- oper = new RefsOperation();
-
- tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
-
- tx.addOperation(oper);
+ RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+
+ if (oper == null)
+ {
+ oper = new RefsOperation();
+
+ tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
+
+ tx.addOperation(oper);
+ }
+
+ return oper;
}
-
- return oper;
}
public void cancel(final Transaction tx, final MessageReference reference) throws Exception
@@ -1000,6 +1003,12 @@
{
return name.hashCode();
}
+
+ @Override
+ public String toString()
+ {
+ return "QueueImpl(name=" + this.name.toString() + ")";
+ }
// Private
// ------------------------------------------------------------------------------
@@ -1463,12 +1472,12 @@
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
- void addRef(final MessageReference ref)
+ synchronized void addRef(final MessageReference ref)
{
refsToAdd.add(ref);
}
- void addAck(final MessageReference ref)
+ synchronized void addAck(final MessageReference ref)
{
refsToAck.add(ref);
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-03-13 15:57:44 UTC (rev 6079)
@@ -293,7 +293,7 @@
this.messagingException = messagingException;
}
- public void addOperation(final TransactionOperation operation)
+ public synchronized void addOperation(final TransactionOperation operation)
{
checkCreateOperations();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java 2009-03-13 15:31:21 UTC (rev 6078)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaTest.java 2009-03-13 15:57:44 UTC (rev 6079)
@@ -26,16 +26,21 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -48,6 +53,7 @@
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*/
public class BasicXaTest extends ServiceTestBase
{
@@ -76,6 +82,7 @@
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
configuration.setPagingDirectory(getPageDir());
+ configuration.setPagingMaxGlobalSizeBytes(0); // no paging for these tests
messagingService = createService(false, configuration, addressSettings);
@@ -126,7 +133,7 @@
validateRM(sessionFactory, sessionFactory);
validateRM(nettyFactory, sessionFactory);
}
-
+
private void validateRM(ClientSessionFactory factory1, ClientSessionFactory factory2) throws Exception
{
ClientSession session1 = factory1.createSession(true, false, false);
@@ -227,7 +234,7 @@
clientSession.commit(xid, true);
clientSession.start();
clientConsumer = clientSession.createConsumer(atestq);
- m = clientConsumer.receive(1000);
+ m = clientConsumer.receiveImmediate();
assertNull(m);
}
@@ -266,28 +273,40 @@
public void testSendMultipleQueues() throws Exception
{
- multipleQueuesInternalTest(true, false, false, false);
+ multipleQueuesInternalTest(true, false, false, false, false);
}
public void testSendMultipleQueuesOnePhase() throws Exception
{
- multipleQueuesInternalTest(true, false, false, true);
- multipleQueuesInternalTest(false, false, true, true);
+ multipleQueuesInternalTest(true, false, false, false, true);
+ multipleQueuesInternalTest(false, false, true, false, true);
}
+ public void testSendMultipleQueuesOnePhaseJoin() throws Exception
+ {
+ multipleQueuesInternalTest(true, false, false, true, true);
+ multipleQueuesInternalTest(false, false, true, true, true);
+ }
+
+ public void testSendMultipleQueuesTwoPhaseJoin() throws Exception
+ {
+ multipleQueuesInternalTest(true, false, false, true, false);
+ multipleQueuesInternalTest(false, false, true, true, false);
+ }
+
public void testSendMultipleQueuesRecreate() throws Exception
{
- multipleQueuesInternalTest(true, false, true, false);
+ multipleQueuesInternalTest(true, false, true, false, false);
}
public void testSendMultipleSuspend() throws Exception
{
- multipleQueuesInternalTest(true, true, false, false);
+ multipleQueuesInternalTest(true, true, false, false, false);
}
public void testSendMultipleSuspendRecreate() throws Exception
{
- multipleQueuesInternalTest(true, true, true, false);
+ multipleQueuesInternalTest(true, true, true, false, false);
}
public void testSendMultipleSuspendErrorCheck() throws Exception
@@ -313,6 +332,78 @@
session.close();
}
+ public void testForget() throws Exception
+ {
+ clientSession.forget(newXID());
+ }
+
+ public void testSimpleJoin() throws Exception
+ {
+ sessionFactory.setBlockOnPersistentSend(true);
+
+ SimpleString ADDRESS1 = new SimpleString("Address-1");
+ SimpleString ADDRESS2 = new SimpleString("Address-2");
+
+ clientSession.createQueue(ADDRESS1, ADDRESS1, true);
+ clientSession.createQueue(ADDRESS2, ADDRESS2, true);
+
+ Xid xid = newXID();
+
+ ClientSession sessionA = sessionFactory.createSession(true, false, false);
+ sessionA.start(xid, XAResource.TMNOFLAGS);
+
+ ClientSession sessionB = sessionFactory.createSession(true, false, false);
+ sessionB.start(xid, XAResource.TMJOIN);
+
+ ClientProducer prodA = sessionA.createProducer(ADDRESS1);
+ ClientProducer prodB = sessionB.createProducer(ADDRESS2);
+
+ for (int i = 0; i < 100; i++)
+ {
+ prodA.send(createTextMessage(sessionA, "A" + i));
+ prodB.send(createTextMessage(sessionB, "B" + i));
+ }
+
+ sessionA.end(xid, XAResource.TMSUCCESS);
+ sessionB.end(xid, XAResource.TMSUCCESS);
+
+ sessionB.close();
+
+ sessionA.commit(xid, true);
+
+ sessionA.close();
+
+ xid = newXID();
+
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+
+ ClientConsumer cons1 = clientSession.createConsumer(ADDRESS1);
+ ClientConsumer cons2 = clientSession.createConsumer(ADDRESS2);
+ clientSession.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = cons1.receive(1000);
+ assertNotNull(msg);
+ assertEquals("A" + i, getTextMessage(msg));
+ msg.acknowledge();
+
+ msg = cons2.receive(1000);
+ assertNotNull(msg);
+ assertEquals("B" + i, getTextMessage(msg));
+ msg.acknowledge();
+ }
+
+ assertNull(cons1.receiveImmediate());
+ assertNull(cons2.receiveImmediate());
+
+ clientSession.end(xid, XAResource.TMSUCCESS);
+
+ clientSession.commit(xid, true);
+
+ clientSession.close();
+ }
+
/**
* @throws MessagingException
* @throws XAException
@@ -320,7 +411,8 @@
protected void multipleQueuesInternalTest(boolean createQueues,
boolean suspend,
boolean recreateSession,
- boolean onePhase) throws MessagingException, XAException
+ boolean isJoinSession,
+ boolean onePhase) throws Exception
{
int NUMBER_OF_MSGS = 100;
int NUMBER_OF_QUEUES = 10;
@@ -328,6 +420,8 @@
SimpleString ADDRESS = new SimpleString("Address");
+ ClientSession newJoinSession = null;
+
try
{
@@ -338,6 +432,11 @@
for (int i = 0; i < NUMBER_OF_QUEUES; i++)
{
session.createQueue(ADDRESS, ADDRESS.concat(Integer.toString(i)), true);
+ if (isJoinSession)
+ {
+ clientSession.createQueue(ADDRESS.concat("-join"), ADDRESS.concat("-join." + i), true);
+ }
+
}
}
@@ -365,8 +464,30 @@
prod.close();
+ if (isJoinSession)
+ {
+ newJoinSession = sessionFactory.createSession(true, false, false);
+
+ // This is a basic condition, or a real TM wouldn't be able to join both sessions in a single
+ // transactions
+ assertTrue(session.isSameRM(newJoinSession));
+
+ newJoinSession.start(xid, XAResource.TMJOIN);
+
+ // The Join Session will have its own queue, as it's not possible to guarantee ordering since this
+ // producer will be using a different session
+ ClientProducer newProd = newJoinSession.createProducer(ADDRESS.concat("-join"));
+ newProd.send(createTextMessage(newJoinSession, "After Join"));
+ }
+
session.end(xid, XAResource.TMSUCCESS);
+ if (isJoinSession)
+ {
+ newJoinSession.end(xid, XAResource.TMSUCCESS);
+ newJoinSession.close();
+ }
+
if (!onePhase)
{
session.prepare(xid);
@@ -416,6 +537,7 @@
ClientMessage msg = consumer.receive(1000);
assertNotNull(msg);
+ assertEquals("one more", getTextMessage(msg));
msg.acknowledge();
if (suspend)
@@ -426,8 +548,28 @@
assertEquals("one more", getTextMessage(msg));
+ if (isJoinSession)
+ {
+ ClientSession newSession = sessionFactory.createSession(true, false, false);
+
+ newSession.start(xid, XAResource.TMJOIN);
+
+ newSession.start();
+
+ ClientConsumer newConsumer = newSession.createConsumer(ADDRESS.concat("-join." + nqueues));
+
+ msg = newConsumer.receive(1000);
+ assertNotNull(msg);
+
+ assertEquals("After Join", getTextMessage(msg));
+ msg.acknowledge();
+
+ newSession.end(xid, XAResource.TMSUCCESS);
+
+ newSession.close();
+ }
+
assertNull(consumer.receiveImmediate());
-
consumer.close();
}
More information about the jboss-cvs-commits
mailing list