[jboss-cvs] JBoss Messaging SVN: r5027 - in branches/Branch_1_4: src/main/org/jboss/messaging/core/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Sep 25 08:48:01 EDT 2008


Author: ataylor
Date: 2008-09-25 08:48:01 -0400 (Thu, 25 Sep 2008)
New Revision: 5027

Modified:
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1282 - we always add a tx to the database evenif no persistent messages are sent/received.

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2008-09-25 12:48:01 UTC (rev 5027)
@@ -85,6 +85,8 @@
 
    boolean idExists(String messageID) throws Exception;
 
+   void addTransaction(Transaction tx);
+
    // Interface value classes ----------------------------------------------------------------------
 
    class MessageChannelPair

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2008-09-25 12:48:01 UTC (rev 5027)
@@ -255,6 +255,10 @@
 
                pm.addReference(channelID, ref, tx);
             }
+            else if(recoverable)
+            {
+               pm.addTransaction(tx);
+            }
          }
 
          messagesAdded.increment();
@@ -760,6 +764,10 @@
          {
             pm.removeReference(channelID, d.getReference(), tx);
          }
+         else if(recoverable)
+         {
+            pm.addTransaction(tx);
+         }
       }
    }
 

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2008-09-25 12:48:01 UTC (rev 5027)
@@ -1556,6 +1556,11 @@
       return false;
    }
 
+   public void addTransaction(Transaction tx)
+   {
+      //this forces the tx to be added
+      getCallback(tx);
+   }
 
 
    // Public --------------------------------------------------------
@@ -1938,11 +1943,11 @@
 
             try
             {
-               // Insert the tx record
-               if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
-               {
+               // Insert the tx record even if there are no refs as we need it for recovery
+               //if (!refsToAdd.isEmpty() || !refsToRemove.isEmpty())
+               //{
                   addTXRecord(conn, tx);
-               }
+               //}
 
                boolean first = false;
 

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2008-09-25 12:48:01 UTC (rev 5027)
@@ -115,6 +115,11 @@
       // NOOP
    }
 
+   public void addTransaction(Transaction tx)
+   {
+      //To change body of implemented methods use File | Settings | File Templates.
+   }
+
    public long reserveIDBlock(String counterName, int size) throws Exception
    {
       checkServerID();

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2008-09-25 08:58:27 UTC (rev 5026)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/XARecoveryTest.java	2008-09-25 12:48:01 UTC (rev 5027)
@@ -21,15 +21,7 @@
   */
 package org.jboss.test.messaging.jms;
 
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.XAConnection;
-import javax.jms.XASession;
+import javax.jms.*;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
@@ -86,7 +78,317 @@
 
 
    // Public --------------------------------------------------------
-   
+        /* A simple acknowledgement in a transaction using NP messages, recovered with restart */
+   public void testSimpleTransactionalAcknowledgementNPRecoveryWithRestart() throws Exception
+   {
+      log.trace("starting testSimpleTransactionalAcknowledgementRecoveryWithRestart");
+
+      Connection conn1 = null;
+
+      XAConnection conn2 = null;
+
+      XAConnection conn3 = null;
+
+      try
+      {
+         //First send a message to the queue
+         conn1 = cf.createConnection();
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess1.createProducer(queue4);
+         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+
+         prod.send(tm1);
+
+         conn2 = cf.createXAConnection();
+
+         XASession sess2 = conn2.createXASession();
+
+         XAResource res1 = sess2.getXAResource();
+
+         //Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer cons = sess2.createConsumer(queue4);
+
+         conn2.start();
+
+         //Consume the message
+
+         TextMessage rm1 = (TextMessage)cons.receive(1000);
+
+         assertNotNull(rm1);
+
+         assertEquals(tm1.getText(), rm1.getText());
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         //prepare the tx
+
+         res1.prepare(xid1);
+
+         conn1.close();
+
+         conn2.close();
+
+         conn1 = null;
+
+         conn2 = null;
+
+         // Now "crash" the server
+
+         ServerManagement.stopServerPeer();
+
+         ServerManagement.startServerPeer();
+
+         deployAndLookupAdministeredObjects();
+
+         //Now recover
+
+         conn3 = cf.createXAConnection();
+
+         XASession sess3 = conn3.createXASession();
+
+         XAResource res3 = sess3.getXAResource();
+
+         Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+
+         assertEquals(xid1, xids[0]);
+
+         //Commit the tx
+
+         res3.commit(xids[0], false);
+
+         //The message should be acknowldged
+
+         conn3.close();
+
+         conn1 = cf.createConnection();
+
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons1 = sess1.createConsumer(queue4);
+
+         conn1.start();
+
+         Message m = cons1.receive(1000);
+
+         assertNull(m);
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn2 != null)
+         {
+            try
+            {
+               conn2.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn3 != null)
+         {
+            try
+            {
+               conn3.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+      }
+   }
+
+
+   /* A simple send in a transaction usingNP messages - recovered after restarting the server */
+   public void testSimpleTransactionalSendNPRecoveryWithRestart() throws Exception
+   {
+      log.trace("starting testSimpleTransactionalDeliveryRecoveryWithRestart");
+
+      XAConnection conn1 = null;
+
+      Connection conn2 = null;
+
+      XAConnection conn3 = null;
+
+      try
+      {
+         conn1 = cf.createXAConnection();
+
+         XASession sess1 = conn1.createXASession();
+
+         XAResource res1 = sess1.getXAResource();
+
+         //Pretend to be a transaction manager by interacting through the XAResources
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         log.trace("Sending message");
+
+         //Send message in tx
+
+         res1.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageProducer prod1 = sess1.createProducer(queue4);
+
+         prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         TextMessage tm1 = sess1.createTextMessage("tm1");
+         TextMessage tm2 = sess1.createTextMessage("tm2");
+
+         prod1.send(tm1);
+         prod1.send(tm2);
+
+         res1.end(xid1, XAResource.TMSUCCESS);
+
+         log.trace("Sent message");
+
+         //prepare tx
+
+         res1.prepare(xid1);
+
+         log.trace("prepared tx");
+
+         conn2 = cf.createConnection();
+
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons2 = sess2.createConsumer(queue4);
+
+         conn2.start();
+
+         //Verify message can't be received
+
+         Message m = cons2.receive(1000);
+
+         assertNull(m);
+
+         conn1.close();
+
+         conn2.close();
+
+         conn1 = null;
+
+         conn2 = null;
+
+         // Now "crash" the server
+
+         ServerManagement.stopServerPeer();
+
+         ServerManagement.startServerPeer();
+
+         deployAndLookupAdministeredObjects();
+
+         conn3 = cf.createXAConnection();
+
+         XASession sess3 = conn3.createXASession();
+
+         XAResource res3 = sess3.getXAResource();
+
+         log.trace("created connection");
+
+         Xid[] xids = res3.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = res3.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+
+         assertEquals(xid1, xids[0]);
+
+         log.trace("recovered");
+
+         conn2 = cf.createConnection();
+
+         sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         cons2 = sess2.createConsumer(queue4);
+
+         conn2.start();
+
+         // Verify message still can't be received
+
+         m = cons2.receive(1000);
+
+         assertNull(m);
+
+         log.trace("still can't see message");
+
+         //Commit the tx
+
+         res3.commit(xids[0], false);
+
+         log.trace("committed");
+
+         //The message should now be available
+
+         TextMessage rm1 = (TextMessage)cons2.receive(1000);
+
+         assertNull(rm1);
+
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            try
+            {
+               conn1.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn2 != null)
+         {
+            try
+            {
+               conn2.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+
+         if (conn3 != null)
+         {
+            try
+            {
+               conn3.close();
+            }
+            catch (Exception e)
+            {
+               //Ignore
+            }
+         }
+      }
+   }
    /*
     * In this test, we have two queues, each with four messages already in them.
     * 




More information about the jboss-cvs-commits mailing list