[jboss-cvs] JBoss Messaging SVN: r5488 - in branches/Branch_1_4: src/main/org/jboss/messaging/core/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 9 09:52:49 EST 2008


Author: gaohoward
Date: 2008-12-09 09:52:49 -0500 (Tue, 09 Dec 2008)
New Revision: 5488

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
Log:
JBMESSAGING-1440


Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java	2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java	2008-12-09 14:52:49 UTC (rev 5488)
@@ -48,4 +48,10 @@
    void cancel() throws Throwable;   
    
    boolean isRecovered();
+
+   /**
+    * Mark if this delivery is with XA transaction.
+    * propagated from the receiver.
+    */
+   boolean isXA();
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-12-09 14:52:49 UTC (rev 5488)
@@ -288,6 +288,13 @@
       acknowledgeInternal(d, null, false);
    }
 
+   /*
+    * Note: If a XA tx is not committed while failure happens, cancelling of the 
+    * delivery shouldn't put the message with transactions back to re-deliver.
+    * It must be there in DB until the transaction recovery happens.
+    * 
+    * @see org.jboss.messaging.core.contract.DeliveryObserver#cancel(org.jboss.messaging.core.contract.Delivery)
+    */
    public void cancel(Delivery del) throws Throwable
    {
       //We may need to update the delivery count in the database
@@ -306,7 +313,13 @@
 
       if (!checkAndSchedule(ref))
       {
-         cancelInternal(ref);
+         /*
+          * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+          */
+         if (!del.isXA()) 
+         {
+            cancelInternal(ref);
+         }
       }
    }
 

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2008-12-09 14:52:49 UTC (rev 5488)
@@ -50,6 +50,7 @@
    private DeliveryObserver observer;
    private MessageReference reference;   
    private boolean recovered;
+   private boolean xa;
 
    private boolean trace = log.isTraceEnabled();
 
@@ -73,6 +74,7 @@
       this.observer = observer;
       this.selectorAccepted = selectorAccepted;
       this.recovered = recovered;
+      this.xa = false;
    }
 
    // Delivery implementation ----------------------------------------------------------------------
@@ -96,6 +98,15 @@
    {        
       if (trace) { log.trace(this + " acknowledging delivery " + ( tx == null ? "non-transactionally" : "in " + tx)); }
       
+      if (tx.getXid() != null)
+      {
+         xa = true;
+      }
+      else
+      {
+         xa = false;
+      }
+      
       observer.acknowledge(this, tx);
    }
 
@@ -118,6 +129,14 @@
       return "Delivery" + (reference == null ? "" : "[" + reference + "]");
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.contract.Delivery#isXA()
+    */
+   public boolean isXA()
+   {
+      return xa;
+   }
+
    // Package protected ----------------------------------------------------------------------------
    
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2008-12-09 14:11:33 UTC (rev 5487)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2008-12-09 14:52:49 UTC (rev 5488)
@@ -3549,5 +3549,215 @@
          }    
       }
    }
+
+   /*
+    * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
+    */
+   public void testReceivingMessageOfPreparedTransaction() throws Exception
+   {
+      log.trace("starting testReceivingMessageOfPreparedTransaction");
+
+      Connection conn1 = null;
+
+      XAConnection conn2 = null;
+
+      XAConnection conn3 = null;
+
+      XAConnection conn4 = null;
+
+      Connection conn5 = null;
+
+      try
+      {
+         //First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess1.createProducer(queue4);
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+         TextMessage tm1 = sess1.createTextMessage("TxM1");
+
+         prod.send(tm1);
+
+         conn2 = cf.createXAConnection();
+
+         XASession sess2 = conn2.createXASession();
+
+         XAResource res1 = sess2.getXAResource();
+
+         //Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("tx1".getBytes(), 42, "abcdef".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = sess2.createConsumer(queue4);
+
+         conn2.start();
+
+         //Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         //prepare the tx
+
+         res1.prepare(xid1);
+
+         conn1.close();
+
+         conn2.close();
+
+         conn1 = null;
+
+         conn2 = null;
+
+         //Now receive again
+
+         conn3 = cf.createXAConnection();
+
+         XASession sess3 = conn3.createXASession();
+
+         XAResource res3 = sess3.getXAResource();
+         
+         Xid xid2 = new MessagingXid("tx2".getBytes(), 42, "ghijkl".getBytes());
+
+         res3.start(xid2, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons3 = sess3.createConsumer(queue4);
+
+         conn3.start();
+
+         //Consume the message
+
+         TextMessage rm3 = (TextMessage)cons3.receive(3000);
+
+         assertNull(rm3);
+
+         res3.end(xid2, XAResource.TMSUCCESS);
+         
+         res3.prepare(xid2);
+         res3.commit(xid2, false);
+
+         conn3.close();
+         conn3 = null;
+         
+         //now recover the lost message
+         ServerManagement.stopServerPeer();
+
+         ServerManagement.startServerPeer();
+
+         deployAndLookupAdministeredObjects();
+
+         //Now recover
+
+         conn4 = cf.createXAConnection();
+
+         XASession sess4 = conn4.createXASession();
+
+         XAResource res4 = sess4.getXAResource();
+
+         Xid[] xids = res4.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = res4.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+
+         assertEquals(xid1, xids[0]);
+
+         //commit the tx
+
+         res4.commit(xids[0], false);
+
+         //The message should be received
+
+         conn4.close();
+         conn4 = null;
+
+         conn5 = cf.createConnection();
+
+         Session sess5 = conn5.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons5 = sess5.createConsumer(queue4);
+
+         conn5.start();
+
+         Message m = cons5.receive(1000);
+
+         assertNull(m);
+         
+         conn5.close();
+         conn5 = null;
+         
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn2 != null)
+         {
+            try
+            {
+               conn2.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn3 != null)
+         {
+            try
+            {
+               conn3.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn4 != null)
+         {
+            try
+            {
+               conn4.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn5 != null)
+         {
+            try
+            {
+               conn5.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+      }
+   }
+
 }
 




More information about the jboss-cvs-commits mailing list