[jboss-cvs] JBoss Messaging SVN: r2455 - in trunk: src/main/org/jboss/jms/tx and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 26 20:42:58 EST 2007


Author: timfox
Date: 2007-02-26 20:42:58 -0500 (Mon, 26 Feb 2007)
New Revision: 2455

Modified:
   trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Log:
various fixes and extra test stuff


Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2007-02-27 01:14:28 UTC (rev 2454)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2007-02-27 01:42:58 UTC (rev 2455)
@@ -145,46 +145,19 @@
 
    <!-- Managed operations -->
 
-    <operation>
-       <description>JBoss Service lifecycle operation</description>
-       <name>create</name>
-    </operation>
-
-    <operation>
-       <description>Prepares a failover from a given node</description>
-       <name>failOver</name>
-        <parameter>
-          <description>The Failed ID </description>
-          <name>failedId</name>
-          <type>int</type>
-        </parameter>
-
-    </operation>
-
-    <operation>
-       <description>List Defined Bindings</description>
-       <name>listBindings</name>
-       <return-type>java.lang.String</return-type>
-    </operation>
-
    <operation>
       <description>JBoss Service lifecycle operation</description>
-      <name>start</name>
+      <name>create</name>
    </operation>
 
    <operation>
       <description>JBoss Service lifecycle operation</description>
-      <name>stop</name>
+      <name>start</name>
    </operation>
 
    <operation>
-      <description></description>
+      <description>JBoss Service lifecycle operation</description>
       <name>stop</name>
-      <parameter>
-        <description>Should we send a notification about leaving cluster</description>
-        <name>sendNotification</name>
-        <type>boolean</type>
-      </parameter>
    </operation>
 
    <operation>

Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-02-27 01:14:28 UTC (rev 2454)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2007-02-27 01:42:58 UTC (rev 2455)
@@ -67,6 +67,10 @@
    private List sessionStatesList;
 
    private boolean clientSide;
+   
+   private boolean hasPersistentAcks;
+   
+   private boolean failedOver;
 
    // Static --------------------------------------------------------
 
@@ -104,7 +108,22 @@
       SessionTxState sessionTxState = getSessionTxState(sessionId);
 
       sessionTxState.addAck(info);
+      
+      if (info.getMessageProxy().getMessage().isReliable())
+      {
+         hasPersistentAcks = true;
+      }
    }
+   
+   public boolean hasPersistentAcks()
+   {
+      return hasPersistentAcks;
+   }
+   
+   public boolean isFailedOver()
+   {
+      return failedOver;
+   }
 
    public void clearMessages()
    {
@@ -156,7 +175,7 @@
       {
          throw new IllegalStateException("Cannot call this method on the server side");
       }
-
+      
       // Note we have to do this in one go since there may be overlap between old and new session
       // IDs and we don't want to overwrite keys in the map.
 
@@ -183,6 +202,8 @@
          // swap
          sessionStatesMap = tmpMap;
       }
+      
+      failedOver = true;
    }
 
    /**

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-02-27 01:14:28 UTC (rev 2454)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2007-02-27 01:42:58 UTC (rev 2455)
@@ -184,6 +184,8 @@
       
       ClientTransaction tx = this.getTxInternal(xid);
       
+      checkAndRollbackJMS(tx, xid);
+      
       // Invalid xid
       if (tx == null)
       {
@@ -307,6 +309,8 @@
       state.setState(ClientTransaction.TX_ENDED);
    }
    
+   
+   
    int prepare(Xid xid, ConnectionDelegate connection) throws XAException
    {
       if (trace) { log.trace("preparing " + xid); }
@@ -318,6 +322,8 @@
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
       } 
       
+      checkAndRollbackXA(state, xid);
+      
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.TWO_PHASE_PREPARE_REQUEST, xid, state);
       
@@ -346,6 +352,8 @@
             throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
          }
          
+         checkAndRollbackXA(tx, xid);
+         
          TransactionRequest request =
             new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
          
@@ -368,7 +376,7 @@
             //may happen if we have recovered from failure and the transaction manager
             //is calling commit on the transaction as part of the recovery process.
          }
-         
+                           
          TransactionRequest request =
             new TransactionRequest(TransactionRequest.TWO_PHASE_COMMIT_REQUEST, xid, null);
          
@@ -638,6 +646,50 @@
       }
    }
    
+   private void checkAndRollbackJMS(ClientTransaction state, Object xid) throws JMSException
+   {
+      Exception e = checkAndRollback(state, xid, false);
+      if (e != null)
+      {
+         throw (JMSException)e;
+      }
+   }
+   
+   private void checkAndRollbackXA(ClientTransaction state, Object xid) throws XAException
+   {
+      Exception e = checkAndRollback(state, xid, true);
+      if (e != null)
+      {
+         throw (XAException)e;
+      }
+   }
+   
+   private Exception checkAndRollback(ClientTransaction state, Object xid, boolean xa)
+   {
+      if (state.isFailedOver() && state.hasPersistentAcks())
+      {
+         // http://jira.jboss.org/jira/browse/JBMESSAGING-883
+         // If a transaction has persistent acks in it and it has failed over from another server
+         // then it's possible that on failover another consumer got the messages that we have already
+         // received. Therfore to be strict and avoid any possibility of duplicate delivery we must
+         // doom the transaction
+         removeTx(xid);
+         
+         final String msg = "Rolled back tx branch to avoid possibility of duplicates http://jira.jboss.org/jira/browse/JBMESSAGING-883";
+         
+         if (xa)
+         {
+            return new MessagingXAException(XAException.XA_HEURRB, msg);
+         }
+         else
+         {
+            return new MessagingTransactionRolledBackException(msg);
+         }            
+      }
+      
+      return null;
+   }
+   
    // Inner Classes --------------------------------------------------------------------------------
   
 }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-27 01:14:28 UTC (rev 2454)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java	2007-02-27 01:42:58 UTC (rev 2455)
@@ -6,28 +6,29 @@
  */
 package org.jboss.test.messaging.jms.clustering;
 
-import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
-import org.jboss.jms.client.JBossConnection;
-import org.jboss.jms.client.FailoverEvent;
-import org.jboss.jms.client.remoting.JMSRemotingConnection;
-import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.jms.Connection;
-import javax.jms.Session;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
-import javax.jms.MessageConsumer;
 import javax.jms.QueueBrowser;
+import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
 
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.HashSet;
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.DelegateSupport;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.util.MessagingTransactionRolledBackException;
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
 
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -1125,95 +1126,98 @@
       }
    }
 
-   public void testTransactedSessionWithAcknowledgmentsCommitOnFailover() throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         // skip connection to node 0
-         conn = cf.createConnection();
-         conn.close();
-
-         // create a connection to node 1
-         conn = cf.createConnection();
-
-         conn.start();
-
-         assertEquals(1, ((JBossConnection)conn).getServerID());
-
-         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-
-         // send 2 messages (one persistent and one non-persistent)
-
-         MessageProducer prod = session.createProducer(queue[1]);
-
-         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
-         prod.send(session.createTextMessage("clik-persistent"));
-         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-         prod.send(session.createTextMessage("clak-non-persistent"));
-
-         session.commit();
-
-         // close the producer
-         prod.close();
-
-         // create a consumer and receive messages, but don't acknowledge
-
-         MessageConsumer cons = session.createConsumer(queue[1]);
-         TextMessage clik = (TextMessage)cons.receive(2000);
-         assertEquals("clik-persistent", clik.getText());
-         TextMessage clak = (TextMessage)cons.receive(2000);
-         assertEquals("clak-non-persistent", clak.getText());
-
-         // register a failover listener
-         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
-         ((JBossConnection)conn).registerFailoverListener(failoverListener);
-
-         log.debug("killing node 1 ....");
-
-         ServerManagement.kill(1);
-
-         log.info("########");
-         log.info("######## KILLED NODE 1");
-         log.info("########");
-
-         // wait for the client-side failover to complete
-
-         while(true)
-         {
-            FailoverEvent event = failoverListener.getEvent(120000);
-            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
-            {
-               break;
-            }
-            if (event == null)
-            {
-               fail("Did not get expected FAILOVER_COMPLETED event");
-            }
-         }
-
-         // failover complete
-         log.info("failover completed");
-
-         assertEquals(0, ((JBossConnection)conn).getServerID());
-
-         // acknowledge the messages
-         session.commit();
-
-         // make sure no messages are left in the queue
-         Message m = cons.receive(1000);
-         assertNull(m);
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
+   // Commented out until http://jira.jboss.org/jira/browse/JBMESSAGING-883
+//   public void testTransactedSessionWithAcknowledgmentsCommitOnFailover() throws Exception
+//   {
+//      Connection conn = null;
+//
+//      try
+//      {
+//         // skip connection to node 0
+//         conn = cf.createConnection();
+//         conn.close();
+//
+//         // create a connection to node 1
+//         conn = cf.createConnection();
+//
+//         conn.start();
+//
+//         assertEquals(1, ((JBossConnection)conn).getServerID());
+//
+//         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+//
+//         // send 2 messages (one persistent and one non-persistent)
+//
+//         MessageProducer prod = session.createProducer(queue[1]);
+//
+//         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+//         prod.send(session.createTextMessage("clik-persistent"));
+//         prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+//         prod.send(session.createTextMessage("clak-non-persistent"));
+//
+//         session.commit();
+//
+//         // close the producer
+   
+//         prod.close();
+//
+//         // create a consumer and receive messages, but don't acknowledge
+//
+//         MessageConsumer cons = session.createConsumer(queue[1]);
+//         TextMessage clik = (TextMessage)cons.receive(2000);
+//         assertEquals("clik-persistent", clik.getText());
+//         TextMessage clak = (TextMessage)cons.receive(2000);
+//         assertEquals("clak-non-persistent", clak.getText());
+//
+//         // register a failover listener
+//         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+//         ((JBossConnection)conn).registerFailoverListener(failoverListener);
+//
+//         log.debug("killing node 1 ....");
+//
+//         ServerManagement.kill(1);
+//
+//         log.info("########");
+//         log.info("######## KILLED NODE 1");
+//         log.info("########");
+//
+//         // wait for the client-side failover to complete
+//
+//         while(true)
+//         {
+//            FailoverEvent event = failoverListener.getEvent(120000);
+//            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+//            {
+//               break;
+//            }
+//            if (event == null)
+//            {
+//               fail("Did not get expected FAILOVER_COMPLETED event");
+//            }
+//         }
+//
+//         // failover complete
+//         log.info("failover completed");
+//
+//         assertEquals(0, ((JBossConnection)conn).getServerID());
+//
+//         // acknowledge the messages
+//         session.commit();
+//
+//         // make sure no messages are left in the queue
+//         Message m = cons.receive(1000);
+//         assertNull(m);
+//      }
+//      finally
+//      {
+//         if (conn != null)
+//         {
+//            conn.close();
+//         }
+//      }
+//   }
+   
+  
    public void testTransactedSessionWithAcknowledgmentsRollbackOnFailover() throws Exception
    {
       Connection conn = null;
@@ -1679,74 +1683,76 @@
       failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
    }
 
-   public void testFailureRightAfterSendTransaction() throws Exception
-   {
-      Connection conn = null;
-      Connection conn0 = null;
+   // Commented out until this is complete:
+   // http://jira.jboss.org/jira/browse/JBMESSAGING-604
+//   public void testFailureRightAfterSendTransaction() throws Exception
+//   {
+//      Connection conn = null;
+//      Connection conn0 = null;
+//
+//      try
+//      {
+//         conn0 = cf.createConnection();
+//
+//         assertEquals(0, ((JBossConnection)conn0).getServerID());
+//
+//         conn0.close();
+//
+//         conn = cf.createConnection();
+//
+//         assertEquals(1, ((JBossConnection)conn).getServerID());
+//
+//         // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+//         // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+//         JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+//            getDelegate()).getRemotingConnection();
+//         rc.removeConnectionListener();
+//
+//         // poison the server
+//         ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
+//
+//         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+//
+//         conn.start();
+//
+//         MessageProducer producer = session.createProducer(queue[0]);
+//
+//         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+//
+//         MessageConsumer consumer = session.createConsumer(queue[0]);
+//
+//         producer.send(session.createTextMessage("before-poison1"));
+//         producer.send(session.createTextMessage("before-poison2"));
+//         producer.send(session.createTextMessage("before-poison3"));
+//         session.commit();
+//
+//         Thread.sleep(2000);
+//
+//         for (int i = 1; i <= 10; i++)
+//         {
+//            TextMessage tm = (TextMessage) consumer.receive(5000);
+//
+//            assertNotNull(tm);
+//
+//            assertEquals("before-poison" + i, tm.getText());
+//         }
+//
+//         assertNull(consumer.receive(1000));
+//
+//      }
+//      finally
+//      {
+//         if (conn != null)
+//         {
+//            conn.close();
+//         }
+//         if (conn0 != null)
+//         {
+//            conn0.close();
+//         }
+//      }
+//   }
 
-      try
-      {
-         conn0 = cf.createConnection();
-
-         assertEquals(0, ((JBossConnection)conn0).getServerID());
-
-         conn0.close();
-
-         conn = cf.createConnection();
-
-         assertEquals(1, ((JBossConnection)conn).getServerID());
-
-         // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
-         // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
-         JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
-            getDelegate()).getRemotingConnection();
-         rc.removeConnectionListener();
-
-         // poison the server
-         ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
-
-         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-
-         conn.start();
-
-         MessageProducer producer = session.createProducer(queue[0]);
-
-         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
-         MessageConsumer consumer = session.createConsumer(queue[0]);
-
-         producer.send(session.createTextMessage("before-poison1"));
-         producer.send(session.createTextMessage("before-poison2"));
-         producer.send(session.createTextMessage("before-poison3"));
-         session.commit();
-
-         Thread.sleep(2000);
-
-         for (int i = 1; i <= 3; i++)
-         {
-            TextMessage tm = (TextMessage) consumer.receive(5000);
-
-            assertNotNull(tm);
-
-            assertEquals("before-poison" + i, tm.getText());
-         }
-
-         assertNull(consumer.receive(1000));
-
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-         if (conn0 != null)
-         {
-            conn0.close();
-         }
-      }
-   }
-
    public void testCloseConsumer() throws Exception
    {
       Connection conn0 = null;
@@ -1895,8 +1901,273 @@
       }
    }
 
+   //See http://jira.jboss.org/jira/browse/JBMESSAGING-883
+   //This is commented out until we have a better fix in 1.2.1
+//   public void testFailoverDeliveryRecoveryTransacted() throws Exception
+//   {
+//      Connection conn0 = null;
+//      Connection conn1 = null;
+//
+//      try
+//      {
+//         conn0 = cf.createConnection();
+//
+//         // Objects Server1
+//         conn1 = cf.createConnection();
+//
+//         assertEquals(1, ((JBossConnection)conn1).getServerID());
+//
+//         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+//         
+//         Session session2 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+//
+//         MessageConsumer cons1 = session1.createConsumer(queue[1]);
+//         
+//         MessageConsumer cons2 = session2.createConsumer(queue[1]);
+//         
+//         MessageProducer prod = session1.createProducer(queue[1]);
+//         
+//         conn1.start();
+//                  
+//         TextMessage tm1 = session1.createTextMessage("message1");
+//         
+//         TextMessage tm2 = session1.createTextMessage("message2");
+//         
+//         TextMessage tm3 = session1.createTextMessage("message3");
+//         
+//         prod.send(tm1);
+//         
+//         prod.send(tm2);
+//         
+//         prod.send(tm3);
+//         
+//         session1.commit();
+//                           
+//         TextMessage rm1 = (TextMessage)cons1.receive(1000);
+//         
+//         assertNotNull(rm1);
+//         
+//         assertEquals(tm1.getText(), rm1.getText());
+//                                    
+//         TextMessage rm2 = (TextMessage)cons2.receive(1000);
+//         
+//         assertNotNull(rm2);
+//         
+//         assertEquals(tm2.getText(), rm2.getText());
+//         
+//         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+//         ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+//
+//         log.debug("killing node 1 ....");
+//
+//         ServerManagement.kill(1);
+//
+//         log.info("########");
+//         log.info("######## KILLED NODE 1");
+//         log.info("########");
+//
+//         // wait for the client-side failover to complete
+//
+//         while(true)
+//         {
+//            FailoverEvent event = failoverListener.getEvent(120000);
+//            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+//            {
+//               break;
+//            }
+//            if (event == null)
+//            {
+//               fail("Did not get expected FAILOVER_COMPLETED event");
+//            }
+//         }
+//
+//         // failover complete
+//         log.info("failover completed");
+//         
+//
+//         //now commit
+//         
+//         session1.commit();
+//         
+//         session2.commit();
+//         
+//         session1.close();
+//         
+//         session2.close();;
+//         
+//         Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         
+//         MessageConsumer cons3 = session3.createConsumer(queue[0]);
+//         
+//         TextMessage rm3 = (TextMessage)cons3.receive(2000);
+//         
+//         assertNotNull(rm3);
+//         
+//         assertEquals(tm3.getText(), rm3.getText());
+//         
+//         rm3 = (TextMessage)cons3.receive(2000);
+//         
+//         assertNull(rm3);
+//
+//         
+//      }
+//      finally
+//      {
+//         if (conn1 != null)
+//         {
+//            conn1.close();
+//         }
+//
+//         if (conn0 != null)
+//         {
+//            conn0.close();
+//         }
+//      }
+//   }
    
+   // See http://jira.jboss.org/jira/browse/JBMESSAGING-883
+   // This tests our current behaviour - which is throwing an exception
+   // This will change in 1.2.1
+   public void testFailoverDeliveryRecoveryTransacted() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
 
+      try
+      {
+         conn0 = cf.createConnection();
+
+         // Objects Server1
+         conn1 = cf.createConnection();
+
+         assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+         Session session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+         
+         Session session2 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageConsumer cons1 = session1.createConsumer(queue[1]);
+         
+         MessageConsumer cons2 = session2.createConsumer(queue[1]);
+         
+         MessageProducer prod = session1.createProducer(queue[1]);
+         
+         conn1.start();
+                  
+         TextMessage tm1 = session1.createTextMessage("message1");
+         
+         TextMessage tm2 = session1.createTextMessage("message2");
+         
+         TextMessage tm3 = session1.createTextMessage("message3");
+         
+         prod.send(tm1);
+         
+         prod.send(tm2);
+         
+         prod.send(tm3);
+         
+         session1.commit();
+                           
+         TextMessage rm1 = (TextMessage)cons1.receive(1000);
+         
+         assertNotNull(rm1);
+         
+         assertEquals(tm1.getText(), rm1.getText());
+                                    
+         TextMessage rm2 = (TextMessage)cons2.receive(1000);
+         
+         assertNotNull(rm2);
+         
+         assertEquals(tm2.getText(), rm2.getText());
+         
+         SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+         ((JBossConnection)conn1).registerFailoverListener(failoverListener);
+
+         log.debug("killing node 1 ....");
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED NODE 1");
+         log.info("########");
+
+         // wait for the client-side failover to complete
+
+         while(true)
+         {
+            FailoverEvent event = failoverListener.getEvent(120000);
+            if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+            {
+               break;
+            }
+            if (event == null)
+            {
+               fail("Did not get expected FAILOVER_COMPLETED event");
+            }
+         }
+
+         // failover complete
+         log.info("failover completed");
+         
+
+         //now commit
+         
+         try
+         {
+            session1.commit();
+            
+            fail();
+         }
+         catch (MessagingTransactionRolledBackException e)
+         {
+            //Ok
+         }
+         
+         try
+         {
+            session2.commit();
+            
+            fail();
+         }
+         catch (MessagingTransactionRolledBackException e)
+         {
+            //Ok
+         }
+         
+         session1.close();
+         
+         session2.close();;
+         
+         Session session3 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons3 = session3.createConsumer(queue[0]);
+         
+         TextMessage rm3 = (TextMessage)cons3.receive(2000);
+         
+         assertNotNull(rm3);
+         
+         assertEquals(tm3.getText(), rm3.getText());
+         
+         rm3 = (TextMessage)cons3.receive(2000);
+         
+         assertNull(rm3);
+
+         
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+      }
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -2055,6 +2326,8 @@
          }
       }
    }
+   
+   
 
    // Inner classes --------------------------------------------------------------------------------
    

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-02-27 01:14:28 UTC (rev 2454)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java	2007-02-27 01:42:58 UTC (rev 2455)
@@ -189,73 +189,8 @@
       }
    }
    
-   class Killer implements Runnable
-   { 
-      boolean failed;
-      
-      public void run()
-      {
-         try
-         {                                     
-            Thread.sleep(10000);
-               
-            log.info("Killing server 0");
-            ServerManagement.kill(0);
-            
-            Thread.sleep(10000);
-            
-            log.info("starting server 0");
-            ServerManagement.start(0, "all");
-            
-            Thread.sleep(10000);
-            
-            log.info("Killing server 1");
-            ServerManagement.kill(1);
-            
-            Thread.sleep(10000);
-            
-            log.info("Starting server 1");
-            ServerManagement.start(1, "all");
-            
-            Thread.sleep(10000);
-            
-            log.info("Killing server 0");
-            ServerManagement.kill(0);
-            
-            Thread.sleep(10000);
-            
-            log.info("Starting server 0");
-            ServerManagement.start(0, "all");
-            
-            Thread.sleep(10000);
-            
-            log.info("Killing server 1");
-            ServerManagement.kill(1);
-            
-            Thread.sleep(10000);
-            
-            log.info("Starting server 1");
-            ServerManagement.start(1, "all");
-            
-            Thread.sleep(10000);
-            
-            log.info("Killing server 0");
-            ServerManagement.kill(0);
-            
-            Thread.sleep(10000);
-            
-            log.info("Starting server 0");
-            ServerManagement.start(0, "all");
-
-         }
-         catch (Exception e)
-         {               
-            failed = true;
-         }
-      }
-      
-   }
    
+   
    public void testFailoverFloodTwoServers() throws Exception
    {
       Connection conn = null;
@@ -272,9 +207,7 @@
 
          Latch latch = new Latch();
          
-         final int NUM_MESSAGES = 10000;         
-         
-         MessageListener list = new MyListener(latch, NUM_MESSAGES);
+         MyListener list = new MyListener(latch);
 
          cons.setMessageListener(list);
 
@@ -286,26 +219,43 @@
 
          int count = 0;
          
-         Thread t = new Thread(new Killer());
+         Killer killer = new Killer();
          
+         Thread t = new Thread(killer);
+         
          t.start();
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         
+         while (!killer.isDone())
          {
             TextMessage tm = sessSend.createTextMessage("message " + count);
 
             prod.send(tm);
             
-            Thread.sleep(250);
+            Thread.sleep(10);
 
-            log.info("sent " + count);
+            if (count % 100 == 0)
+            {
+               log.info("sent " + count);
+            }
 
             count++;
          }
          
+         log.info("sending done");
+         
          t.join();
          
-         latch.acquire();
+         log.info("stopping listener");
+         
+         if (killer.failed)
+         {
+            fail();
+         }
+         
+         if (list.failed)
+         {
+            fail();
+         }
       }
       catch (Exception e)
       {
@@ -351,21 +301,99 @@
 
    // Inner classes --------------------------------------------------------------------------------
 
+   class Killer implements Runnable
+   { 
+      volatile boolean failed;
+      
+      volatile boolean done;
+      
+      public boolean isDone()
+      {
+         return done;
+      }
+      
+      public void run()
+      {
+         try
+         {                                     
+            Thread.sleep(10000);
+               
+            log.info("Killing server 0");
+            ServerManagement.kill(0);
+            
+            Thread.sleep(5000);
+            
+            log.info("starting server 0");
+            ServerManagement.start(0, "all", false);
+            ServerManagement.deployQueue("testDistributedQueue", 0);
+            
+            Thread.sleep(5000);
+            
+            log.info("Killing server 1");
+            ServerManagement.kill(1);
+            
+            Thread.sleep(5000);
+            
+            log.info("Starting server 1");
+            ServerManagement.start(1, "all", false);
+            ServerManagement.deployQueue("testDistributedQueue", 1);
+            
+            Thread.sleep(5000);
+            
+            log.info("Killing server 0");
+            ServerManagement.kill(0);
+            
+            Thread.sleep(5000);
+            
+            log.info("Starting server 0");
+            ServerManagement.start(0, "all", false);
+            ServerManagement.deployQueue("testDistributedQueue", 0);
+            
+            Thread.sleep(5000);
+            
+            log.info("Killing server 1");
+            ServerManagement.kill(1);
+            
+            Thread.sleep(5000);
+            
+            log.info("Starting server 1");
+            ServerManagement.start(1, "all", false);
+            ServerManagement.deployQueue("testDistributedQueue", 1);
+            
+            Thread.sleep(5000);
+            
+            log.info("Killing server 0");
+            ServerManagement.kill(0);
+            
+            Thread.sleep(5000);
+            
+            log.info("Starting server 0");
+            ServerManagement.start(0, "all", false);
+            ServerManagement.deployQueue("testDistributedQueue", 0);
+            
+            log.info("killer DONE");
+         }
+         catch (Exception e)
+         {               
+            failed = true;
+         }
+         
+         done = true;
+      }
+      
+   }
+   
    class MyListener implements MessageListener
    {
       int count = 0;
       
       Latch latch;
       
-      boolean failed;
+      volatile boolean failed;
       
-      int num;
-      
-      MyListener(Latch latch, int num)
+      MyListener(Latch latch)
       {
          this.latch = latch;
-         
-         this.num = num;
       }
            
       public void onMessage(Message msg)
@@ -374,7 +402,10 @@
          {
             TextMessage tm = (TextMessage)msg;
             
-            log.info("Received message " + tm.getText());
+            if (count % 100 == 0)
+            {
+               log.info("Received message " + tm.getText());
+            }
             
             if (!tm.getText().equals("message " + count))
             {
@@ -384,15 +415,11 @@
             }
             
             count++;
-            
-            if (count == num)
-            {
-               latch.release();
-            }
          }
          catch (Exception e)
          {
             log.error("Failed to receive", e);
+            failed = true;
          }
       }
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2007-02-27 01:14:28 UTC (rev 2454)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java	2007-02-27 01:42:58 UTC (rev 2455)
@@ -453,7 +453,6 @@
       
       XAConnectionFactory xaCF = (XAConnectionFactory)cf;
       
-
       try
       {
          xaConn0 = xaCF.createXAConnection();
@@ -609,7 +608,7 @@
          
          cons1.close();
          
-         // Message should now be receivable
+         // Messages should now be receivable
 
          Connection conn = null;
          try
@@ -638,13 +637,16 @@
                numberOfReceivedMessages++;
             }
 
-
+            //These two should be acked
+            
             assertFalse("\"plop0\" message was duplicated",
                receivedMessages.contains("plop0"));
 
             assertFalse("\"plop1\" message was duplicated",
-               receivedMessages.contains("plop0"));
+               receivedMessages.contains("plop1"));
 
+            //And these should be receivable
+            
             assertTrue("\"Cupid stunt0\" message wasn't received",
                receivedMessages.contains("Cupid stunt0"));
 




More information about the jboss-cvs-commits mailing list