[jboss-cvs] JBoss Messaging SVN: r7218 - in branches/Branch_1_4: tests/src/org/jboss/test/messaging/jms and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 5 05:03:15 EDT 2009
Author: gaohoward
Date: 2009-06-05 05:03:15 -0400 (Fri, 05 Jun 2009)
New Revision: 7218
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
Log:
JBMESSAGING-1638
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-06-05 08:02:44 UTC (rev 7217)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2009-06-05 09:03:15 UTC (rev 7218)
@@ -834,6 +834,22 @@
{
deliveringCount.decrement();
}
+
+ MessageReference ref = d.getReference();
+ if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+ {
+ if (trace)
+ {
+ log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
+ }
+ synchronized (lock)
+ {
+ if (monitor.messageCompleted(ref))
+ {
+ deliverInternal();
+ }
+ }
+ }
}
else
{
@@ -850,22 +866,6 @@
pm.addTransaction(tx);
}
}
-
- MessageReference ref = d.getReference();
- if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
- {
- if (trace)
- {
- log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
- }
- synchronized (lock)
- {
- if (monitor.messageCompleted(ref))
- {
- deliverInternal();
- }
- }
- }
}
protected InMemoryCallback getCallback(Transaction tx)
@@ -1054,6 +1054,19 @@
{
deliveringCount.decrement();
}
+
+ MessageReference ref = del.getReference();
+ if (OrderingGroupMonitor.isOrderingGroupMessage(ref))
+ {
+ if (trace)
+ {
+ log.trace("Ordering group message " + ref + " has been completed, trying to send next.");
+ }
+ synchronized (lock)
+ {
+ promptDelivery = (promptDelivery || monitor.messageCompleted(ref));
+ }
+ }
}
// prompt delivery
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java 2009-06-05 08:02:44 UTC (rev 7217)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/OrderingGroupAckTest.java 2009-06-05 09:03:15 UTC (rev 7218)
@@ -429,6 +429,8 @@
XASession xsess2 = xconn2.createXASession();
+ MessageConsumer cons2 = xsess2.createConsumer(queue1);
+
XAResource res3 = xsess2.getXAResource();
Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
@@ -438,6 +440,10 @@
assertEquals(0, xids2.length);
assertEquals(xid1, xids[0]);
+
+ //before recover and commit the message cannot be received.
+ rm1 = (TextMessage)cons2.receive(2000);
+ assertNull(rm1);
// Commit the tx
@@ -498,7 +504,417 @@
}
}
}
+
+
+ /*
+ * send 4 messages, start a XA transaction to receive, next message will not
+ * be delivered if the XA is in prepared state
+ */
+ public void testSimpleXATransactionalRollbackReceive() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalRollbackReceive");
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ // Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res1.prepare(xid1);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.rollback(xid1);
+
+ //start another Tx, and rollback again.
+ Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+ res1.start(xid2, XAResource.TMNOFLAGS);
+
+ rm1 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid2, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res1.prepare(xid2);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.rollback(xid2);
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm1.getText());
+
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm2.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm3.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ /*
+ * send 4 messages, start a XA transaction to receive, first rollback, then commit
+ * next message will be delivered only if the XA tx is committed
+ */
+ public void testSimpleXATransactionalRollbackCommitReceive() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalRollbackReceive");
+
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ // Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ //roll back directly
+ res1.rollback(xid1);
+
+ //start another Tx, and rollback again.
+ Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+ res1.start(xid2, XAResource.TMNOFLAGS);
+
+ rm1 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid2, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ // prepare the tx
+ res1.prepare(xid2);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.commit(xid2, false);
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm2.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm3.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
+ /*
+ * send 4 messages, start a XA transaction to receive, then commit without prepare
+ * next message will be delivered only if the XA tx is committed
+ */
+ public void testSimpleXATransactionalCommitOnephaseReceive() throws Exception
+ {
+ log.trace("starting testSimpleXATransactionalRollbackReceive");
+
+ Connection conn1 = null;
+
+ XAConnection xconn1 = null;
+
+ try
+ {
+ // First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ JBossMessageProducer prod = (JBossMessageProducer)sess1.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ prod.enableOrderingGroup("testSimpleXATransactionalRollbackReceive");
+
+ TextMessage tm1 = sess1.createTextMessage("tm1");
+ TextMessage tm2 = sess1.createTextMessage("tm2");
+ TextMessage tm3 = sess1.createTextMessage("tm3");
+ TextMessage tm4 = sess1.createTextMessage("tm4");
+
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ prod.send(tm4);
+
+ xconn1 = cf.createXAConnection();
+
+ XASession xsess1 = xconn1.createXASession();
+
+ XAResource res1 = xsess1.getXAResource();
+
+ // Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = xsess1.createConsumer(queue1);
+
+ xconn1.start();
+
+ // Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ //roll back directly
+ res1.rollback(xid1);
+
+ //start another Tx, and rollback again.
+ Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli".getBytes());
+ res1.start(xid2, XAResource.TMNOFLAGS);
+
+ rm1 = (TextMessage)cons.receive(1000);
+ assertNotNull(rm1);
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid2, XAResource.TMSUCCESS);
+
+ // next message should not be received.
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNull(rm1);
+
+ res1.commit(xid2, true);
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm2.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm3.getText());
+
+ rm1 = (TextMessage)cons.receive(2000);
+ assertNotNull(rm1);
+ assertEquals(rm1.getText(), tm4.getText());
+
+ checkEmpty(queue1);
+
+ conn1.close();
+
+ xconn1.close();
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+
+ if (xconn1 != null)
+ {
+ try
+ {
+ xconn1.close();
+ }
+ catch (Exception e)
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list