[jboss-cvs] JBoss Messaging SVN: r2313 - in trunk: src/main/org/jboss/jms/tx and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Feb 14 15:25:03 EST 2007


Author: timfox
Date: 2007-02-14 15:25:03 -0500 (Wed, 14 Feb 2007)
New Revision: 2313

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Modified:
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java
   trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
   trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
   trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-734


Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -1444,15 +1444,32 @@
       {
          if (key.equals(DefaultClusteredPostOffice.FAILED_OVER_FOR_KEY))
          {
-            // We have a failover status change - notify anyone waiting
-
-            log.debug(ServerPeer.this + ".FailoverListener got failover event, notifying those waiting on lock");
-
+            if (updatedReplicantMap != null && originatingNodeId == serverPeerID)
+            {
+               FailoverStatus status = (FailoverStatus)updatedReplicantMap.get(new Integer(serverPeerID));
+               
+               if (status != null && status.isFailingOver())
+               {                     
+                  //We prompt txRepository to load any prepared txs - so we can take over responsibility for
+                  //in doubt transactions from other nodes
+                  try
+                  {
+                     txRepository.loadPreparedTransactions();
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to load prepared transactions", e);
+                  }
+               }
+            }
+            
             synchronized (failoverStatusLock)
             {
+               log.debug(ServerPeer.this + ".FailoverListener got failover event, notifying those waiting on lock");
+               
                failoverStatusLock.notifyAll();
             }
-         }
+         }         
       }      
    }
 }

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -533,6 +533,8 @@
          {
             Xid[] txs = conn.getPreparedTransactions();
             
+            if (trace) { log.trace("Got " + txs.length + " transactions from server"); }
+            
             //populate with TxState --MK
             for (int i = 0; i < txs.length;i++)
             {

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -102,6 +102,11 @@
    {
       return failingOver && currentlyFailingOverForNode == nodeId;
    }
+   
+   public boolean isFailingOver()
+   {
+      return failingOver;
+   }
 
    public String toString()
    {

Modified: trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/Transaction.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/messaging/core/tx/Transaction.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -62,9 +62,8 @@
    
    private Map callbackMap;
    
-   private boolean loadedAtStartup;
-      
-   
+   private boolean recoveredFromStorage;
+         
    /**
     * If this is a XA transaction, when a commit is executed the transaction has to be removed from the transaction repository.
     * This reference will guarantee the reference back to the repository where the transaction was created
@@ -182,6 +181,14 @@
       {
          throw new TransactionException("Transaction already rolled back, cannot commit");
       }
+      
+      if (recoveredFromStorage)
+      {
+         //Commit can come in for an in doubt tx that has been recovered from storage
+         //but for which recover() has not yet been called
+         //therefore we might need to load it's state
+         loadState();
+      }
 
       if (trace) { log.trace(this + " executing before commit hooks"); }
        
@@ -292,6 +299,14 @@
          throw new TransactionException("Transaction already rolled back, cannot rollback");
       }
       
+      if (recoveredFromStorage)
+      {
+         //Commit can come in for an in doubt tx that has been recovered from storage
+         //but for which recover() has not yet been called
+         //therefore we might need to load it's state
+         loadState();
+      }
+      
       if (trace) { log.trace(this + " executing before rollback hooks"); }
       
       boolean onePhase = state != STATE_PREPARED;
@@ -335,6 +350,15 @@
       if (trace) { log.trace(this + " rollback process complete"); }
    }
 
+   public void loadState() throws Exception
+   {
+      repository.handleReferences(this);
+      
+      repository.handleAcks(this);
+      
+      recoveredFromStorage = false;
+   }
+   
    public synchronized void setRollbackOnly() throws Exception
    {
       if (trace) { log.trace("setting ROLLBACK_ONLY on " + this); }
@@ -347,14 +371,14 @@
       return id;
    }
    
-   public boolean isLoadedAtStartup()
+   public boolean isRecoveredFromStorage()
    {
-      return this.loadedAtStartup;
+      return this.recoveredFromStorage;
    }
    
-   public void setLoadedAtStartup(boolean loadedAtStartup)
+   public void setRecoveredFromStorage(boolean recoveredFromStorage)
    {
-      this.loadedAtStartup = loadedAtStartup;
+      this.recoveredFromStorage = recoveredFromStorage;
    }
    
    public void setState(int state)

Modified: trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -128,11 +128,11 @@
       ArrayList prepared = new ArrayList();
 
       Iterator iter = globalToLocalMap.values().iterator();
-
+      
       while (iter.hasNext())
       {
          Transaction tx = (Transaction) iter.next();
-
+         
          if (tx.getXid() != null && tx.getState() == Transaction.STATE_PREPARED)
          {
             try
@@ -143,13 +143,10 @@
                //in which case the tx will already have the references and acks in them
                //in this case we DO NOT want to replay them again, since they will end up in the transaction state
                //twice
-               //In other words we only want to replay acks and sends if this tx was loaded at startup
-               if (tx.isLoadedAtStartup())
+               //In other words we only want to replay acks and sends if this tx was recovered from the db
+               if (tx.isRecoveredFromStorage())
                {
-                  handleReferences(tx);
-                  handleAcks(tx);
-                  
-                  tx.setLoadedAtStartup(false);
+                  tx.loadState();
                }
             }
             catch (Exception e)
@@ -160,6 +157,8 @@
             prepared.add(tx.getXid());
          }
       }
+      
+      if (trace) { log.trace("Returning " + prepared.size() + " transactions"); }
 
       return prepared;
    }
@@ -186,16 +185,26 @@
          {
             PreparedTxInfo txInfo = (PreparedTxInfo) iter.next();
 
-            if (trace) log.trace("reinstating TX(XID: " + txInfo.getXid() + ", LocalId " + txInfo.getTxId() +")");
+            //This method may be called more than once - e.g. when failover occurs so we don't want to add the
+            //prepared tx if it is already in memory
             
-            Transaction tx = createTransaction(txInfo);
-            
-            tx.setState(Transaction.STATE_PREPARED);
-            
-            tx.setLoadedAtStartup(true);
-            
+            if (!globalToLocalMap.containsKey(txInfo.getXid()))
+            {
+               Transaction tx = createTransaction(txInfo);
+               
+               tx.setState(Transaction.STATE_PREPARED);
+               
+               tx.setRecoveredFromStorage(true);
+               
+               if (trace) log.trace("reinstating TX(XID: " + txInfo.getXid() + ", LocalId " + txInfo.getTxId() +")");
+               
+            }  
+            else
+            {
+               if (trace) log.trace("Not adding to map since it's already in map");
+            }
          }
-      }
+      }     
    }
    
    public List getPreparedTransactions()
@@ -272,22 +281,20 @@
 	  return this.globalToLocalMap.size();   
    }
    
+   
+   
    // Package protected ---------------------------------------------
    
-   // Protected -----------------------------------------------------         
-   
-   // Private -------------------------------------------------------
-
-	/**
-	 * Load the references and invoke the channel to handle those refs
-	 */
-	private void handleReferences(Transaction tx) throws Exception
+   /**
+    * Load the references and invoke the channel to handle those refs
+    */
+   void handleReferences(Transaction tx) throws Exception
    {
       if (trace) log.trace("Handle references for TX(XID: " + tx.getXid() + ", LocalID: " + tx.getId()+ "):");
 
       long txId = tx.getId();
 
-		List pairs = persistenceManager.getMessageChannelPairRefsForTx(txId);
+      List pairs = persistenceManager.getMessageChannelPairRefsForTx(txId);
 
       if (trace) log.trace("Found " + pairs.size() + " unhandled references.");
 
@@ -327,13 +334,13 @@
                ref.releaseMemoryReference();
             }
          }
-		}
-	}
+      }
+   }
 
-	/**
-	 * Load the acks and acknowledge them
-	 */
-	private void handleAcks(Transaction tx) throws Exception
+   /**
+    * Load the acks and acknowledge them
+    */
+   void handleAcks(Transaction tx) throws Exception
    {
       long txId = tx.getId();
       
@@ -401,7 +408,13 @@
          tx.addCallback(new CancelCallback(dels), this);
       }
       
-   }         
+   }
+   
+   // Protected -----------------------------------------------------         
+   
+   // Private -------------------------------------------------------
+
+	         
       
 	/**
 	 * Creates a prepared transaction

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -1926,36 +1926,5 @@
    }
 
    // Inner classes --------------------------------------------------------------------------------
-
-   private class SimpleFailoverListener implements FailoverListener
-   {
-      private LinkedQueue buffer;
-
-      public SimpleFailoverListener()
-      {
-         buffer = new LinkedQueue();
-      }
-
-      public void failoverEventOccured(FailoverEvent event)
-      {
-         try
-         {
-            buffer.put(event);
-         }
-         catch(InterruptedException e)
-         {
-            throw new RuntimeException("Putting thread interrupted while trying to add event " +
-               "to buffer", e);
-         }
-      }
-
-      /**
-       * Blocks until a FailoverEvent is available or timeout occurs, in which case returns null.
-       */
-      public FailoverEvent getEvent(long timeout) throws InterruptedException
-      {
-         return (FailoverEvent)buffer.poll(timeout);
-      }
-   }
-
+   
 }

Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -0,0 +1,949 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
+import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
+
+/**
+ * 
+ * A XAFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class XAFailoverTest extends ClusteringTestBase
+{
+   // Constants ------------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private ServiceContainer sc;
+   
+   private TransactionManager tm;
+   
+   private Transaction suspended;
+   
+   // Constructors ---------------------------------------------------------------------------------
+
+   public XAFailoverTest(String name)
+   {
+      super(name);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+   
+   protected void setUp() throws Exception
+   {
+      nodeCount = 2;
+
+      super.setUp();
+
+      sc = new ServiceContainer("transaction");
+      
+      //Don't drop the tables again!
+      sc.start(false);
+   
+      InitialContext localIc = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
+           
+      tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
+      
+      suspended = tm.suspend();
+      
+      log.debug("setup done");
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      sc.stop();
+      
+      if (suspended != null)
+      {
+         tm.resume(suspended);
+      }
+   }
+
+   public void testSimpleXAConnectionFailover() throws Exception
+   {
+      XAConnection conn = null;
+      
+      XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+      try
+      {
+         // skip connection to node 0
+         conn = xaCF.createXAConnection();
+         conn.close();
+
+         // create a connection to node 1
+         conn = xaCF.createXAConnection();
+         conn.start();
+
+         assertEquals(1, ((JBossConnection)conn).getServerID());
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+
+         assertEquals(0, ((JBossConnection)conn).getServerID());
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+
+   public void testSendFailBeforePrepare() throws Exception
+   {
+      XAConnection xaConn = null;
+      
+      XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+      
+      Connection conn = null;
+      
+      try
+      {
+         // skip connection to node 0
+         xaConn = xaCF.createXAConnection();
+         xaConn.close();
+
+         // create a connection to node 1
+         xaConn = xaCF.createXAConnection();
+         
+         assertEquals(1, ((JBossConnection)xaConn).getServerID());
+
+         conn = cf.createConnection();
+         conn.close();
+         conn = cf.createConnection();         
+         
+         assertEquals(1, ((JBossConnection)conn).getServerID());
+         
+         conn.start();
+
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)xaConn).registerFailoverListener(failoverListener);
+         
+         // Create a normal consumer on the queue
+         Session sessRec = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons = sessRec.createConsumer(queue[1]);
+         
+         // Create an XA session
+         
+         XASession sess = xaConn.createXASession();
+         
+         XAResource res = sess.getXAResource();
+         
+         MessageProducer prod = sess.createProducer(queue[1]);
+         
+         tm.begin();
+         
+         Transaction tx = tm.getTransaction();
+         
+         tx.enlistResource(res);
+         
+         //Enlist a dummy XAResource to force 2pc
+         XAResource dummy = new DummyXAResource();
+         
+         tx.enlistResource(dummy);
+         
+         //Send a message
+         
+         TextMessage msg = sess.createTextMessage("Cupid stunt");
+         
+         prod.send(msg);
+         
+         //Make sure message can't be received
+         
+         Message m = cons.receive(2000);
+         
+         assertNull(m);
+         
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         
+         tx.delistResource(dummy, XAResource.TMSUCCESS);
+         
+         //Now kill node 1
+         
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+         
+         //Now commit the transaction
+         
+         tm.commit();
+         
+         // Message should now be receivable
+         
+         TextMessage mrec = (TextMessage)cons.receive(2000);
+         
+         assertNotNull(mrec);
+         
+         assertEquals(msg.getText(), mrec.getText());
+         
+         m = cons.receive(2000);
+         
+         assertNull(m);
+
+         assertEquals(0, ((JBossConnection)xaConn).getServerID());
+
+      }
+      finally
+      {
+         if (xaConn != null)
+         {
+            xaConn.close();
+         }
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   
+   public void testSendAndReceiveFailBeforePrepare() throws Exception
+   {
+      XAConnection xaConn = null;
+      
+      XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+      
+      Connection conn = null;
+      
+      try
+      {
+         // skip connection to node 0
+         xaConn = xaCF.createXAConnection();
+         xaConn.close();
+
+         // create a connection to node 1
+         xaConn = xaCF.createXAConnection();
+         
+         assertEquals(1, ((JBossConnection)xaConn).getServerID());
+
+         conn = cf.createConnection();
+         conn.close();
+         conn = cf.createConnection();         
+         
+         assertEquals(1, ((JBossConnection)conn).getServerID());
+         
+         conn.start();
+         
+         xaConn.start();
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)xaConn).registerFailoverListener(failoverListener);
+         
+         // Create a normal consumer on the queue
+         Session sessRec = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         //Send a message to the queue
+         MessageProducer prod = sessRec.createProducer(queue[1]);
+         
+         TextMessage sent = sessRec.createTextMessage("plop");
+         
+         prod.send(sent);
+         
+         // Create an XA session
+         
+         XASession sess = xaConn.createXASession();
+         
+         XAResource res = sess.getXAResource();
+         
+         MessageProducer prod2 = sess.createProducer(queue[1]);
+         
+         MessageConsumer cons2 = sess.createConsumer(queue[1]);
+         
+         tm.begin();
+         
+         Transaction tx = tm.getTransaction();
+         
+         tx.enlistResource(res);
+         
+         //Enlist a dummy XAResource to force 2pc
+         XAResource dummy = new DummyXAResource();        
+         
+         tx.enlistResource(dummy);
+         
+         //receive a message
+         
+         TextMessage received = (TextMessage)cons2.receive(2000);
+         
+         assertNotNull(received);
+         
+         assertEquals(sent.getText(), received.getText());
+         
+         //Send a message
+         
+         TextMessage msg = sess.createTextMessage("Cupid stunt");
+         
+         prod2.send(msg);
+         
+         // Make sure can't be received
+         
+         MessageConsumer cons = sessRec.createConsumer(queue[1]);
+         
+         Message m = cons.receive(2000);
+         
+         assertNull(m);
+                  
+         tx.delistResource(res, XAResource.TMSUCCESS);
+         
+         tx.delistResource(dummy, XAResource.TMSUCCESS);
+         
+         //Now kill node 1
+         
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+         
+         //Now commit the transaction
+         
+         tm.commit();
+         
+         // Message should now be receivable
+         
+         cons2.close();
+         
+         TextMessage mrec = (TextMessage)cons.receive(2000);
+         
+         assertNotNull(mrec);
+         
+         assertEquals(msg.getText(), mrec.getText());
+         
+         m = cons.receive(2000);
+         
+         //And the other message should be acked
+         assertNull(m);                  
+
+         assertEquals(0, ((JBossConnection)xaConn).getServerID());
+
+      }
+      finally
+      {
+         if (xaConn != null)
+         {
+            xaConn.close();
+         }
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   
+   public void testSendAndReceiveTwoConnectionsFailBeforePrepare() throws Exception
+   {
+      XAConnection xaConn0 = null;
+      
+      XAConnection xaConn1 = null;
+      
+      XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+      
+      Connection conn0 = null;
+      
+      Connection conn1 = null;
+      
+      try
+      {
+         xaConn0 = xaCF.createXAConnection();
+         
+         assertEquals(0, ((JBossConnection)xaConn0).getServerID());
+
+         xaConn1 = xaCF.createXAConnection();
+         
+         assertEquals(1, ((JBossConnection)xaConn1).getServerID());
+
+         conn0 = cf.createConnection();
+         
+         assertEquals(0, ((JBossConnection)conn0).getServerID());
+                           
+         conn1 = cf.createConnection();         
+         
+         assertEquals(1, ((JBossConnection)conn1).getServerID());
+         
+         xaConn0.start();
+         
+         xaConn1.start();
+                  
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
+         
+         //Send a message to each queue
+         
+         Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sess.createProducer(queue[0]);
+         
+         TextMessage sent0 = sess.createTextMessage("plop0");
+         
+         prod.send(sent0);
+         
+         sess.close();
+         
+         sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         prod = sess.createProducer(queue[1]);
+         
+         TextMessage sent1 = sess.createTextMessage("plop1");
+         
+         prod.send(sent1);
+         
+         sess.close();
+         
+         
+         
+         
+         XASession sess0 = xaConn0.createXASession();
+         
+         XAResource res0 = sess0.getXAResource();
+         
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+         
+         MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+         
+         
+         XASession sess1 = xaConn1.createXASession();
+         
+         XAResource res1 = sess1.getXAResource();
+         
+         MessageProducer prod1 = sess1.createProducer(queue[1]);
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+         
+                           
+         tm.begin();
+         
+         Transaction tx = tm.getTransaction();
+         
+         tx.enlistResource(res0);
+         
+         tx.enlistResource(res1);
+         
+         //receive a message
+         
+         TextMessage received = (TextMessage)cons0.receive(2000);
+         
+         assertNotNull(received);
+         
+         assertEquals(sent0.getText(), received.getText());
+         
+         
+         received = (TextMessage)cons1.receive(2000);
+         
+         assertNotNull(received);
+         
+         assertEquals(sent1.getText(), received.getText());
+         
+                  
+                  
+         //Send a message
+         
+         TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
+         
+         prod0.send(msg0);
+         
+         TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
+         
+         prod1.send(msg1);
+         
+         
+
+         tx.delistResource(res0, XAResource.TMSUCCESS);
+         
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         
+         //Now kill node 1
+         
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+         
+         //Now commit the transaction
+         
+         tm.commit();
+         
+         cons0.close();
+         
+         cons1.close();
+         
+         // Message should now be receivable
+         
+         conn0.start();
+         
+         Session sessRec0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         cons0 = sessRec0.createConsumer(queue[0]);
+         
+         TextMessage mrec = (TextMessage)cons0.receive(2000);
+         
+         assertNotNull(mrec);
+         
+         assertEquals(msg0.getText(), mrec.getText());
+         
+         Message m = cons0.receive(2000);
+         
+         //And the other message should be acked
+         assertNull(m);                
+         
+         
+         conn1.start();
+                  
+         Session sessRec1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         cons1 = sessRec1.createConsumer(queue[0]);
+         
+         mrec = (TextMessage)cons1.receive(2000);
+         
+         assertNotNull(mrec);
+         
+         assertEquals(msg1.getText(), mrec.getText());
+         
+         m = cons1.receive(2000);
+         
+         //And the other message should be acked
+         assertNull(m);   
+         
+         assertEquals(0, ((JBossConnection)xaConn1).getServerID());
+
+      }
+      finally
+      {
+         if (xaConn1 != null)
+         {
+            xaConn1.close();
+         }
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
+   
+   
+   
+   
+   public void testSendAndReceiveTwoConnectionsFailAfterPrepareAndRecover() throws Exception
+   {
+      XAConnection xaConn0 = null;
+      
+      XAConnection xaConn1 = null;
+      
+      XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+      
+      Connection conn0 = null;
+      
+      Connection conn1 = null;
+      
+      try
+      {
+         xaConn0 = xaCF.createXAConnection();
+         
+         assertEquals(0, ((JBossConnection)xaConn0).getServerID());
+
+         xaConn1 = xaCF.createXAConnection();
+         
+         assertEquals(1, ((JBossConnection)xaConn1).getServerID());
+
+         conn0 = cf.createConnection();
+         
+         assertEquals(0, ((JBossConnection)conn0).getServerID());
+                           
+         conn1 = cf.createConnection();         
+         
+         assertEquals(1, ((JBossConnection)conn1).getServerID());
+         
+         xaConn0.start();
+         
+         xaConn1.start();
+                  
+
+         // register a failover listener
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
+         
+         //Send a message to each queue
+         
+         Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sess.createProducer(queue[0]);
+         
+         TextMessage sent0 = sess.createTextMessage("plop0");
+         
+         prod.send(sent0);
+         
+         sess.close();
+         
+         sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         prod = sess.createProducer(queue[1]);
+         
+         TextMessage sent1 = sess.createTextMessage("plop1");
+         
+         prod.send(sent1);
+         
+         sess.close();
+         
+         
+         
+         
+         XASession sess0 = xaConn0.createXASession();
+         
+         XAResource res0 = sess0.getXAResource();
+         
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+         
+         MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+         
+         
+         XASession sess1 = xaConn1.createXASession();
+         
+         XAResource res1 = sess1.getXAResource();
+         
+         MessageProducer prod1 = sess1.createProducer(queue[1]);
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+         
+                           
+         tm.begin();
+         
+         Transaction tx = tm.getTransaction();
+         
+         tx.enlistResource(res0);
+         
+         tx.enlistResource(res1);
+         
+         //receive a message
+         
+         TextMessage received = (TextMessage)cons0.receive(2000);
+         
+         assertNotNull(received);
+         
+         assertEquals(sent0.getText(), received.getText());
+         
+         
+         received = (TextMessage)cons1.receive(2000);
+         
+         assertNotNull(received);
+         
+         assertEquals(sent1.getText(), received.getText());
+         
+                  
+                  
+         //Send a message
+         
+         TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
+         
+         prod0.send(msg0);
+         
+         TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
+         
+         prod1.send(msg1);
+                  
+         tx.delistResource(res0, XAResource.TMSUCCESS);
+         
+         tx.delistResource(res1, XAResource.TMSUCCESS);
+         
+         // We poison node 1 so that it crashes after prepare but before commit is processed
+         
+         ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
+         
+         tm.commit();
+         
+         //Now kill node 1
+         
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+         
+         //When the node comes back up, the invocation to commit() will be retried on the new node.
+         //The new node will by then already have loaded into memory the prepared transactions from
+         //the failed node so this should complete ok
+
+         // failover complete
+         log.info("failover completed");
+         
+         cons0.close();
+         
+         cons1.close();
+                           
+
+         // Message should now be receivable
+         
+         conn0.start();
+         
+         Session sessRec0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         cons0 = sessRec0.createConsumer(queue[0]);
+         
+         TextMessage mrec = (TextMessage)cons0.receive(2000);
+         
+         assertNotNull(mrec);
+         
+         assertEquals(msg0.getText(), mrec.getText());
+         
+         Message m = cons0.receive(2000);
+         
+         //And the other message should be acked
+         assertNull(m);                
+         
+         
+         conn1.start();
+                  
+         Session sessRec1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         cons1 = sessRec1.createConsumer(queue[0]);
+         
+         mrec = (TextMessage)cons1.receive(2000);
+         
+         assertNotNull(mrec);
+         
+         assertEquals(msg1.getText(), mrec.getText());
+         
+         m = cons1.receive(2000);
+         
+         //And the other message should be acked
+         assertNull(m);   
+         
+         assertEquals(0, ((JBossConnection)xaConn1).getServerID());
+
+      }
+      finally
+      {
+         if (xaConn1 != null)
+         {
+            xaConn1.close();
+         }
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+   }
+   
+
+   
+
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   static class DummyXAResource implements XAResource
+   {
+      boolean failOnPrepare;
+      
+      DummyXAResource()
+      {         
+      }
+      
+      public void commit(Xid arg0, boolean arg1) throws XAException
+      {         
+      }
+
+      public void end(Xid arg0, int arg1) throws XAException
+      {
+      }
+
+      public void forget(Xid arg0) throws XAException
+      {
+      }
+
+      public int getTransactionTimeout() throws XAException
+      {
+          return 0;
+      }
+
+      public boolean isSameRM(XAResource arg0) throws XAException
+      {
+         return false;
+      }
+
+      public int prepare(Xid arg0) throws XAException
+      {
+         return XAResource.XA_OK;
+      }
+
+      public Xid[] recover(int arg0) throws XAException
+      {
+         return null;
+      }
+
+      public void rollback(Xid arg0) throws XAException
+      {
+      }
+
+      public boolean setTransactionTimeout(int arg0) throws XAException
+      {
+         return false;
+      }
+
+      public void start(Xid arg0, int arg1) throws XAException
+      {
+
+      }
+      
+   }
+   
+
+}

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -31,12 +31,17 @@
 import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.InitialContext;
+
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.FailoverListener;
 import org.jboss.jms.client.JBossConnection;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.org">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -276,5 +281,36 @@
    }
 
    // Inner classes --------------------------------------------------------------------------------
+   
+   protected class SimpleFailoverListener implements FailoverListener
+   {
+      private LinkedQueue buffer;
 
+      public SimpleFailoverListener()
+      {
+         buffer = new LinkedQueue();
+      }
+
+      public void failoverEventOccured(FailoverEvent event)
+      {
+         try
+         {
+            buffer.put(event);
+         }
+         catch(InterruptedException e)
+         {
+            throw new RuntimeException("Putting thread interrupted while trying to add event " +
+               "to buffer", e);
+         }
+      }
+
+      /**
+       * Blocks until a FailoverEvent is available or timeout occurs, in which case returns null.
+       */
+      public FailoverEvent getEvent(long timeout) throws InterruptedException
+      {
+         return (FailoverEvent)buffer.poll(timeout);
+      }
+   }
+
 }

Modified: trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java	2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java	2007-02-14 20:25:03 UTC (rev 2313)
@@ -94,14 +94,13 @@
          if (request.getRequestType() == TransactionRequest.TWO_PHASE_COMMIT_REQUEST
              && type == TYPE_2PC_COMMIT)
          {
-            //Crash on 2pc commit - used in message bridge tests
+            //Crash before 2pc commit (after prepare)- used in message bridge tests
             
             log.info("##### Crashing on 2PC commit!!");
             
             crash(target);
          }
-         else
-         if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST &&
+         else if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST &&
              type == FAIL_AFTER_SENDTRANSACTION)
          {
             invocation.invokeNext();




More information about the jboss-cvs-commits mailing list