[jboss-cvs] JBoss Messaging SVN: r8595 - in branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940: src/main/org/jboss/jms/server and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 8 03:28:01 EST 2013


Author: raggz
Date: 2013-03-08 03:28:00 -0500 (Fri, 08 Mar 2013)
New Revision: 8595

Modified:
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
   branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
JBMessaging-1940


Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml	2013-03-08 08:28:00 UTC (rev 8595)
@@ -84,6 +84,7 @@
    INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
    SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
    UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml	2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
    INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
    SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
    UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml	2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
    INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
    SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
    UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml	2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
    INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
    SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
    UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml	2013-03-08 08:28:00 UTC (rev 8595)
@@ -84,6 +84,7 @@
    INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
    SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
    UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml	2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
    INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
    SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
    UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml	2013-03-08 08:28:00 UTC (rev 8595)
@@ -85,6 +85,7 @@
    INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
    SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
    UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+   MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java	2013-03-08 08:28:00 UTC (rev 8595)
@@ -274,6 +274,7 @@
          {
          	((JDBCPersistenceManager)persistenceManager).injectNodeID(serverPeerID);
             ((JDBCPersistenceManager)persistenceManager).setStopServerPeerOnDBFailure(this.stopServerPeerOnDBFailure);
+            ((JDBCPersistenceManager)persistenceManager).setServerPeer(this);
          }
          else if (persistenceManager instanceof NullPersistenceManager)
          {

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2013-03-08 08:28:00 UTC (rev 8595)
@@ -22,11 +22,14 @@
 package org.jboss.messaging.core.impl;
 
 import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.tx.MessagingXid;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.impl.message.MessageFactory;
 import org.jboss.messaging.core.impl.message.MessageSupport;
 import org.jboss.messaging.core.impl.tx.PreparedTxInfo;
@@ -106,16 +109,12 @@
    
    private boolean supportsTxAge;
    
-   private int maxRetry = 25;
-   
-   private int retryInterval = 1000;
-
-   private boolean retryOnConnectionFailure = false;
-   
    private volatile boolean stopServerPeerOnDBFailure = false;
    
    private Object moveLock = new Object();
    
+   private ServerPeer server;
+   
    // Constructors --------------------------------------------------
 
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -1192,6 +1191,14 @@
          public Object doTransaction() throws Exception
          {
             PreparedStatement statement = null;
+            PreparedStatement statement1 = null;
+            PreparedStatement statement2 = null;
+            PreparedStatement statement3 = null;
+            PreparedStatement statement4 = null;
+            ResultSet rs = null;
+            ResultSet rs1 = null;
+            ResultSet rs2 = null;
+
             try
             {
                statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
@@ -1199,14 +1206,106 @@
                statement.setInt(2, fromNodeID);
                int affected = statement.executeUpdate();
 
+               statement.close();
+               statement = null;
+
                log.debug("Merged " + affected + " transactions from channel "
                      + fromNodeID + " into node " + toNodeID);
+               
+               //The following code is for JBMESSAGING-1940
+               if (affected > 0)
+               {
+                  //check non-movable transactions
+                  statement1 = conn.prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
 
+                  statement1.setInt(1, nodeID);
+
+                  rs = statement1.executeQuery();
+
+                  List<Long> nonMovableTxs = new ArrayList<Long>();
+
+                  //check '+" messages, if binding is missing, don't merge this tx
+                  String sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_REF");
+                  statement2 = conn.prepareStatement(sql);
+
+                  //check '-' messages, if binding is missing, don't merge this tx
+                  sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_ACK");
+                  statement3 = conn.prepareStatement(sql);
+                  
+                  PostOffice postOffice = server.getPostOfficeInstance();
+
+                  while (rs.next())
+                  {
+                     boolean found = false;
+
+                     long txId = rs.getLong(1);
+
+                     statement2.setLong(1, txId);
+
+                     rs1 = statement2.executeQuery();
+
+                     while (rs1.next())
+                     {
+                        long channelId = rs1.getLong(2);
+                        Binding binding = postOffice.getBindingForChannelID(channelId);
+                        if (binding == null)
+                        {
+                           nonMovableTxs.add(txId);
+                           found = true;
+                           break;
+                        }
+                     }
+                     
+                     rs1.close();
+                     rs1 = null;
+                     if (found) continue;
+
+                     statement3.setLong(1, txId);
+
+                     rs2 = statement3.executeQuery();
+
+                     while (rs2.next())
+                     {
+                        long channelId = rs2.getLong(2);
+                        Binding binding = postOffice.getBindingForChannelID(channelId);
+                        if (binding == null)
+                        {
+                           nonMovableTxs.add(txId);
+                           break;
+                        }
+                     }
+                     rs2.close();
+                     rs2 = null;
+                  }
+
+                  rs.close();
+                  rs = null;
+                  statement2.close();
+                  statement2 = null;
+                  statement3.close();
+                  statement3 = null;
+
+                  if (nonMovableTxs.size() > 0)
+                  {
+                     //put non-movable tx back
+                     statement4 = conn.prepareStatement(getSQLStatement("MOVE_TX_NODE"));
+                     for (Long nonMTxId : nonMovableTxs)
+                     {
+                        statement4.setInt(1, fromNodeID);
+                        statement4.setLong(2, nonMTxId);
+                        statement4.executeUpdate();
+                     }
+                     statement4.close();
+                     statement4 = null;
+                     log.debug("Moved transactions back: " + nonMovableTxs);
+                  }
+               }
                return null;
             }
             finally
             {
-               closeStatement(statement);
+               closeResultSet(rs, rs1, rs2);
+               closeStatement(statement, statement1, statement2, statement3, statement4);
             }
          }
       }
@@ -3091,6 +3190,7 @@
       map.put("SELECT_MESSAGE_ID_FOR_ACK",
               "SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD");
       map.put("UPDATE_TX", "UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?");
+      map.put("MOVE_TX_NODE", "UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?");
 
       // Counter
       map.put("UPDATE_COUNTER",
@@ -3613,4 +3713,9 @@
       this.stopServerPeerOnDBFailure = crashServerOnDBFailure;
    }
 
+   public void setServerPeer(ServerPeer server)
+   {
+      this.server = server;
+   }
+
 }

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2013-03-08 08:28:00 UTC (rev 8595)
@@ -252,39 +252,45 @@
       return false;
    }
    
-   protected void closeResultSet(ResultSet rs)
+   protected void closeResultSet(ResultSet... rss)
    {
-   	if (rs != null)
+      for (ResultSet rs : rss)
       {
-         try
+         if (rs != null)
          {
-            rs.close();
+            try
+            {
+               rs.close();
+            }
+            catch (Throwable e)
+            {
+               if (trace)
+               {
+                  log.trace("Failed to close result set", e);
+               }
+            }
          }
-         catch (Throwable e)
-         {
-         	if (trace)
-         	{
-         		log.trace("Failed to close result set", e);
-         	}
-         }
       }
    }
    
-   protected void closeStatement(Statement st)
+   protected void closeStatement(Statement... sts)
    {
-   	if (st != null)
+      for (Statement st : sts)
       {
-         try
+   	   if (st != null)
          {
-         	st.close();
+            try
+            {
+         	   st.close();
+            }
+            catch (Throwable e)
+            {
+         	   if (trace)
+         	   {
+         		   log.trace("Failed to close statement", e);
+         	   }
+            }
          }
-         catch (Throwable e)
-         {
-         	if (trace)
-         	{
-         		log.trace("Failed to close statement", e);
-         	}
-         }
       }
    }
    

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2013-03-08 08:28:00 UTC (rev 8595)
@@ -17,6 +17,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.ServerSession;
 import javax.jms.ServerSessionPool;
 import javax.jms.Session;
@@ -1185,6 +1186,187 @@
       }
    }
 
+   /**
+    * deploy a distributed queue on only one node and send some messages ,
+    * receive the messages within a XA transaction, crash the node after 
+    * tx prepared. Then simulate XA recovery on the other node. The expected
+    * behavior is that the recovery won't pick up the messages, which are
+    * only able to be recovered on the original node.
+    * https://jira.jboss.org/jira/browse/JBMESSAGING-1940
+    */
+   public void testRecoveryWithSingleDistributedTargetWithMessage() throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+
+      try
+      {
+         ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+         Queue singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+         conn = createConnectionOnServer(cf, 1);
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer prod = session.createProducer(singleNonClusteredQueue);
+
+         int num = 10;
+         for (int i = 0; i < num; i++)
+         {
+            TextMessage m = session.createTextMessage("TX1-" + i);
+            prod.send(m);
+         }
+
+         xaconn = this.createXAConnectionOnServer(cf,  1);
+         xaconn.start();
+         
+         XASession xasess = xaconn.createXASession();
+         XAResource res = xasess.getXAResource();
+
+         //tx - receive - prepared
+         Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+         res.start(xid1, XAResource.TMNOFLAGS);
+
+         MessageConsumer consumer = xasess.createConsumer(singleNonClusteredQueue);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = consumer.receive(5000);
+            assertNotNull(m);
+         }
+
+         res.end(xid1, XAResource.TMSUCCESS);
+         
+         res.prepare(xid1);
+
+         //tx - send - prepared
+         Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli2".getBytes());
+
+         res.start(xid2, XAResource.TMNOFLAGS);
+
+         MessageProducer producer = xasess.createProducer(singleNonClusteredQueue);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = xasess.createTextMessage("TX2-" + i);
+            producer.send(m);
+         }
+
+         res.end(xid2, XAResource.TMSUCCESS);
+         
+         res.prepare(xid2);
+
+         //normal tx on a distributed queue
+         Xid xid3 = new MessagingXid("bq3".getBytes(), 42, "eemeli3".getBytes());
+
+         res.start(xid3, XAResource.TMNOFLAGS);
+
+         producer = xasess.createProducer(queue[1]);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = xasess.createTextMessage("TX3-" + i);
+            producer.send(m);
+         }
+
+         res.end(xid3, XAResource.TMSUCCESS);
+         
+         res.prepare(xid3);
+         
+         xaconn.close();
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+         ServerManagement.kill(1);
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(30000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         xaconn = createXAConnectionOnServer(cf,  0);
+         xaconn.start();
+         
+         XAResource recoveryRes = xaconn.createXASession().getXAResource();
+         Xid[] txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+         
+         //the transactions xid1 and xid2 shouldn't be recovered on node 0, because 
+         //its messages stay at node 1.
+         assertEquals(1, txs.length);
+
+         assertTrue(txs[0].equals(xid3));
+         
+         //commit
+         recoveryRes.commit(txs[0], false);
+
+         txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, txs.length);
+         
+         //now receive messages
+         Session sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer = sess.createConsumer(queue[0]);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = consumer.receive(5000);
+            assertNotNull(m);
+         }
+
+         xaconn.close();
+         
+         //simulate restarting node 1, do not clean db
+         ServerManagement.start(1, config, overrides, false);
+
+         ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+         ic[1] = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+
+         singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+         xaconn = createXAConnectionOnServer(cf,  1);
+         xaconn.start();
+         
+         recoveryRes = xaconn.createXASession().getXAResource();
+         txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+         
+         //now the tx should get recovered.
+         assertEquals(2, txs.length);
+         
+         recoveryRes.commit(txs[0], false);
+         recoveryRes.commit(txs[1], false);
+         
+         txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, txs.length);
+         
+         //receive message on xid2
+         sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         consumer = sess.createConsumer(singleNonClusteredQueue);
+         for (int i = 0; i < num; i++)
+         {
+            Message m = consumer.receive(5000);
+            assertNotNull(m);
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+      }
+   }
+
    
    // Inner classes --------------------------------------------------------------------------------
 

Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2013-03-08 08:28:00 UTC (rev 8595)
@@ -1004,6 +1004,15 @@
    }
 
    /**
+    * Simulates a queue deployment
+    */
+   public static void deployQueue(boolean clustered, String name, int serverIndex) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployQueue(name, null, clustered);
+   }
+
+   /**
     * Simulates a queue deployment (copying the queue descriptor in the deploy directory).
     */
    public static void deployQueue(String name) throws Exception



More information about the jboss-cvs-commits mailing list