[jboss-cvs] JBoss Messaging SVN: r5488 - in branches/Branch_1_4: src/main/org/jboss/messaging/core/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 9 09:52:49 EST 2008
Author: gaohoward
Date: 2008-12-09 09:52:49 -0500 (Tue, 09 Dec 2008)
New Revision: 5488
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
Log:
JBMESSAGING-1440
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java 2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java 2008-12-09 14:52:49 UTC (rev 5488)
@@ -48,4 +48,10 @@
void cancel() throws Throwable;
boolean isRecovered();
+
+ /**
+ * Mark if this delivery is with XA transaction.
+ * propagated from the receiver.
+ */
+ boolean isXA();
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java 2008-12-09 14:52:49 UTC (rev 5488)
@@ -288,6 +288,13 @@
acknowledgeInternal(d, null, false);
}
+ /*
+ * Note: If a XA tx is not committed while failure happens, cancelling of the
+ * delivery shouldn't put the message with transactions back to re-deliver.
+ * It must be there in DB until the transaction recovery happens.
+ *
+ * @see org.jboss.messaging.core.contract.DeliveryObserver#cancel(org.jboss.messaging.core.contract.Delivery)
+ */
public void cancel(Delivery del) throws Throwable
{
//We may need to update the delivery count in the database
@@ -306,7 +313,13 @@
if (!checkAndSchedule(ref))
{
- cancelInternal(ref);
+ /*
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+ */
+ if (!del.isXA())
+ {
+ cancelInternal(ref);
+ }
}
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2008-12-09 14:52:49 UTC (rev 5488)
@@ -50,6 +50,7 @@
private DeliveryObserver observer;
private MessageReference reference;
private boolean recovered;
+ private boolean xa;
private boolean trace = log.isTraceEnabled();
@@ -73,6 +74,7 @@
this.observer = observer;
this.selectorAccepted = selectorAccepted;
this.recovered = recovered;
+ this.xa = false;
}
// Delivery implementation ----------------------------------------------------------------------
@@ -96,6 +98,15 @@
{
if (trace) { log.trace(this + " acknowledging delivery " + ( tx == null ? "non-transactionally" : "in " + tx)); }
+ if (tx.getXid() != null)
+ {
+ xa = true;
+ }
+ else
+ {
+ xa = false;
+ }
+
observer.acknowledge(this, tx);
}
@@ -118,6 +129,14 @@
return "Delivery" + (reference == null ? "" : "[" + reference + "]");
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.contract.Delivery#isXA()
+ */
+ public boolean isXA()
+ {
+ return xa;
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java 2008-12-09 14:52:49 UTC (rev 5488)
@@ -3549,5 +3549,215 @@
}
}
}
+
+ /*
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+ */
+ public void testReceivingMessageOfPreparedTransaction() throws Exception
+ {
+ log.trace("starting testReceivingMessageOfPreparedTransaction");
+
+ Connection conn1 = null;
+
+ XAConnection conn2 = null;
+
+ XAConnection conn3 = null;
+
+ XAConnection conn4 = null;
+
+ Connection conn5 = null;
+
+ try
+ {
+ //First send a message to the queue
+ conn1 = cf.createConnection();
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess1.createProducer(queue4);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage tm1 = sess1.createTextMessage("TxM1");
+
+ prod.send(tm1);
+
+ conn2 = cf.createXAConnection();
+
+ XASession sess2 = conn2.createXASession();
+
+ XAResource res1 = sess2.getXAResource();
+
+ //Pretend to be a transaction manager by interacting through the XAResources
+ Xid xid1 = new MessagingXid("tx1".getBytes(), 42, "abcdef".getBytes());
+
+ res1.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons = sess2.createConsumer(queue4);
+
+ conn2.start();
+
+ //Consume the message
+
+ TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+ assertNotNull(rm1);
+
+ assertEquals(tm1.getText(), rm1.getText());
+
+ res1.end(xid1, XAResource.TMSUCCESS);
+
+ //prepare the tx
+
+ res1.prepare(xid1);
+
+ conn1.close();
+
+ conn2.close();
+
+ conn1 = null;
+
+ conn2 = null;
+
+ //Now receive again
+
+ conn3 = cf.createXAConnection();
+
+ XASession sess3 = conn3.createXASession();
+
+ XAResource res3 = sess3.getXAResource();
+
+ Xid xid2 = new MessagingXid("tx2".getBytes(), 42, "ghijkl".getBytes());
+
+ res3.start(xid2, XAResource.TMNOFLAGS);
+
+ MessageConsumer cons3 = sess3.createConsumer(queue4);
+
+ conn3.start();
+
+ //Consume the message
+
+ TextMessage rm3 = (TextMessage)cons3.receive(3000);
+
+ assertNull(rm3);
+
+ res3.end(xid2, XAResource.TMSUCCESS);
+
+ res3.prepare(xid2);
+ res3.commit(xid2, false);
+
+ conn3.close();
+ conn3 = null;
+
+ //now recover the lost message
+ ServerManagement.stopServerPeer();
+
+ ServerManagement.startServerPeer();
+
+ deployAndLookupAdministeredObjects();
+
+ //Now recover
+
+ conn4 = cf.createXAConnection();
+
+ XASession sess4 = conn4.createXASession();
+
+ XAResource res4 = sess4.getXAResource();
+
+ Xid[] xids = res4.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = res4.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ assertEquals(xid1, xids[0]);
+
+ //commit the tx
+
+ res4.commit(xids[0], false);
+
+ //The message should be received
+
+ conn4.close();
+ conn4 = null;
+
+ conn5 = cf.createConnection();
+
+ Session sess5 = conn5.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons5 = sess5.createConsumer(queue4);
+
+ conn5.start();
+
+ Message m = cons5.receive(1000);
+
+ assertNull(m);
+
+ conn5.close();
+ conn5 = null;
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ try
+ {
+ conn1.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn2 != null)
+ {
+ try
+ {
+ conn2.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn3 != null)
+ {
+ try
+ {
+ conn3.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn4 != null)
+ {
+ try
+ {
+ conn4.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+
+ if (conn5 != null)
+ {
+ try
+ {
+ conn5.close();
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
+ }
+ }
+
}
More information about the jboss-cvs-commits
mailing list