[jboss-cvs] JBoss Messaging SVN: r7270 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 9 07:29:06 EDT 2009
Author: gaohoward
Date: 2009-06-09 07:29:06 -0400 (Tue, 09 Jun 2009)
New Revision: 7270
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
Log:
JBMESSAGING-1643
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2009-06-09 08:48:39 UTC (rev 7269)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroup.java 2009-06-09 11:29:06 UTC (rev 7270)
@@ -173,6 +173,22 @@
return groupName;
}
+ /**
+ * @param ref
+ */
+ public void unmarkSending(MessageReference ref)
+ {
+ if (sortedList.size() == 0)
+ {
+ return;
+ }
+ ReferenceHolder holder = sortedList.get(0);
+ if (holder.matchMessage(ref))
+ {
+ holder.unmarkSending();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2009-06-09 08:48:39 UTC (rev 7269)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/OrderingGroupMonitor.java 2009-06-09 11:29:06 UTC (rev 7270)
@@ -175,7 +175,7 @@
OrderingGroup group = orderingGroups.get(grpName);
if (group != null)
{
- group.unregister(ref);
+ group.unmarkSending(ref);
}
}
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java 2009-06-09 08:48:39 UTC (rev 7269)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java 2009-06-09 11:29:06 UTC (rev 7270)
@@ -22,6 +22,8 @@
package org.jboss.test.messaging.jms;
+import java.util.ArrayList;
+
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
@@ -33,6 +35,7 @@
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XASession;
+import javax.management.ObjectName;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -914,7 +917,575 @@
}
}
}
+
+ /*
+ * send 4 messages, start a XA transaction to receive, roll back the tx.
+ * then start another transaction, receive the 1st message, then try to receive second again.
+ * the second shouldn't be received.
+ */
+ public void testSimpleXATransactionalRollbackReceive2() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalRollbackReceive2");
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ // Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res1.prepare(xid1);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.rollback(xid1);
+
+ //start another Tx, and rollback again.
+ Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+ res1.start(xid2, XAResource.TMNOFLAGS);
+
+ rm1 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(tm1.getText(), rm1.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.end(xid2, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res1.prepare(xid2);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.commit(xid2, false);
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm2.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm3.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ /*
+ * send 4 messages, start a XA transaction to receive m1, roll back and close the session set.
+ * then start another transaction, receive the 1st message, then try to receive second again.
+ * the second shouldn't be received.
+ */
+ public void testSimpleXATransactionalRollbackReceive3() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalRollbackReceive2");
+
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ // Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res1.prepare(xid1);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.rollback(xid1);
+
+ xsess1.getSession().close();
+
+ XASession xsess2 = xconn1.createXASession();
+
+ XAResource res2 = xsess2.getXAResource();
+
+ MessageConsumer cons2 = xsess2.createConsumer(queue1);
+
+ //start another Tx, commit it.
+ Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+ res2.start(xid2, XAResource.TMNOFLAGS);
+
+ rm1 = (TextMessage)cons2.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(tm1.getText(), rm1.getText());
+
+ rm1 = (TextMessage)cons2.receive(2000);
+ assertNull(rm1);
+
+ res2.end(xid2, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons2.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res2.prepare(xid2);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons2.receive(2000);
+ assertNull(rm1);
+
+ res2.commit(xid2, false);
+
+ rm1 = (TextMessage)cons2.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm2.getText());
+
+ rm1 = (TextMessage)cons2.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm3.getText());
+
+ rm1 = (TextMessage)cons2.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ /*
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1643
+ */
+ public void testSimpleConsumerCloseReceive() throws Exception
+ {
+ log.trace("starting testSimpleConsumerCloseReceive");
+
+ Connection conn1 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ conn1.start();
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+ TextMessage rm1 = (TextMessage)cons1.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), "tm1");
+
+ TextMessage rm2 = (TextMessage)cons1.receive(2000);
+ assertNull(rm2);
+
+ //Next message not available to another consumer either.
+ MessageConsumer cons2 = sess1.createConsumer(queue1);
+ rm2 = (TextMessage)cons2.receive(2000);
+ assertNull(rm2);
+
+ //ack rm1
+ rm1.acknowledge();
+
+ cons1.close();
+
+ rm1 = (TextMessage)cons2.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), "tm2");
+ rm1.acknowledge();
+
+ rm1 = (TextMessage)cons2.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), "tm3");
+ rm1.acknowledge();
+
+ rm1 = (TextMessage)cons2.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), "tm4");
+ rm1.acknowledge();
+
+ checkEmpty(queue1);
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ }
+
+ }
+
+ /*
+ * send 4 messages, start a XA transaction to receive, roll back 2nd message until it
+ * goes to DLQ, then continue to receive the messages, check
+ * the messages received order.
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1643
+ */
+ public void testSimpleXATransactionalRollbackReceive4() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ log.trace("starting testSimpleXATransactionalRollbackReceive4");
+
+ Connection conn1 = null;
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
+
+ final int MAX_DELIVERIES = 5;
+
+ try
+ {
+ String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+ ServerManagement.setAttribute(serverPeerObjectName,
+ "DefaultMaxDeliveryAttempts",
+ String.valueOf(MAX_DELIVERIES));
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("A");
+ TextMessage tm2 = sess1.createTextMessage("B");
+ TextMessage tm3 = sess1.createTextMessage("C");
+ TextMessage tm4 = sess1.createTextMessage("D");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ conn1.start();
+
+ ArrayList<String> rList = new ArrayList<String>();
+ int i = 0;
+ while (true)
+ {
+ XAConnection xconn1 = null;
+ XASession xsess1 = null;
+ MessageConsumer xcons1 = null;
+ i++;
+ try
+ {
+ xconn1 = cf.createXAConnection();
+
+ xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ String bqStr = "bq" + i;
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid(bqStr.getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ xcons1 = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ TextMessage rm1 = (TextMessage)xcons1.receive(5000);
+
+ if (rm1 == null)
+ {
+ break;
+ }
+
+ rList.add(rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // prepare the tx
+ res1.prepare(xid1);
+
+ if (rm1.getText().equals("B"))
+ {
+ res1.rollback(xid1);
+ }
+ else
+ {
+ res1.commit(xid1, false);
+ }
+ }
+ finally
+ {
+ if (xcons1 != null)
+ {
+ try
+ {
+ xcons1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ if (xsess1 != null)
+ {
+ try
+ {
+ xsess1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ //examine result
+ assertEquals(rList.size(), 8);
+ assertEquals("A", rList.get(0));
+ assertEquals("B", rList.get(1));
+ assertEquals("B", rList.get(2));
+ assertEquals("B", rList.get(3));
+ assertEquals("B", rList.get(4));
+ assertEquals("B", rList.get(5));
+ assertEquals("C", rList.get(6));
+ assertEquals("D", rList.get(7));
+
+ MessageConsumer cons1 = sess1.createConsumer(queue2);
+ TextMessage dMsg = (TextMessage)cons1.receive(2000);
+ assertNotNull(dMsg);
+ assertEquals(dMsg.getText(), "B");
+
+ checkEmpty(queue1);
+ checkEmpty(queue2);
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName,
+ "DefaultDLQ",
+ "jboss.messaging.destination:service=Queue,name=DLQ");
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list