[jboss-cvs] JBoss Messaging SVN: r8130 - in branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 19 10:59:48 EST 2010


Author: jbertram at redhat.com
Date: 2010-11-19 10:59:47 -0500 (Fri, 19 Nov 2010)
New Revision: 8130

Added:
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Modified:
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
Log:
SOA-2526

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2010-11-09 19:50:52 UTC (rev 8129)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2010-11-19 15:59:47 UTC (rev 8130)
@@ -1949,12 +1949,12 @@
 
       JBossDestination dest = new JBossQueue(queueName);
 
-      //We don't care about redelivery delays and number of attempts for a direct consumer
-
+      //We don't care about redelivery delays for a direct consumer
+      //We do care about number of attempts, see JBMESSAGING-1774
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, binding.queue,
                                     binding.queue.getName(), this, selectorString, false,
-                                    dest, null, null, 0, -1, true, false, prefetchSize);
+                                    dest, null, null, 0, 1, true, false, prefetchSize);
 
       ConsumerAdvised advised;
 

Copied: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java (from rev 7883, branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2010-11-19 15:59:47 UTC (rev 8130)
@@ -0,0 +1,1135 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.tx.MessagingXid;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * Test for https://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class DestinationRedeployTest extends ClusteringTestBase
+{
+   //clustered2NonclusteredQueue
+   private Queue cQueue;
+   
+   //nonclustered2ClusteredQueue
+   private Queue nQueue;
+   
+   //clustered2NonclusteredTopic
+   private Topic cTopic;
+   
+   //nonclustered2ClusteredTopic
+   private Topic nTopic;
+   
+   public DestinationRedeployTest(String name)
+   {
+      super(name);
+   }
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   //do a redeploy and test the queues work normally by sending some messages and receiving them.
+   public void testRedeployQueue() throws Exception
+   {
+      String msgBase = "testRedeployQueue";
+      int numMsg = 50;
+
+      deployDestinations();
+      redeployDestinations(true);
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      sendMessages(1, nQueue, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //do a redeploy and test the topics work normally by sending some messages and receiving them.
+   public void testRedeployTopic() throws Exception
+   {
+      String msgBase = "testRedeployTopic";
+      int numMsg = 50;
+
+      deployDestinations();
+      redeployDestinations(true);
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer sub1 = sess.createConsumer(cTopic);
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         conn.start();
+
+         sendMessages(0, cTopic, msgBase, numMsg);
+         sendMessages(1, nTopic, msgBase, numMsg);
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub2.close();
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   public void testRedeployTopicNoMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         conn.close();
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         redeployDestinations(true);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn.start();
+
+         sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess.unsubscribe("sub1");
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   public void testRedeployTopicWithMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployTopicWithMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         sess.createDurableSubscriber(cTopic, "sub1");
+         sess.createDurableSubscriber(nTopic, "sub2");
+         
+         conn.close();
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         redeployDestinations(false);
+         
+         checkEmpty(cTopic);
+         checkEmpty(nTopic);
+
+         sendMessages(0, cTopic, msgBase, numMsg);
+         sendMessages(2, nTopic, msgBase, numMsg);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn.start();
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+
+         sess.unsubscribe("sub1");
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   private HashMap<String, TextMessage> msgSet = new HashMap<String, TextMessage>();
+   
+   //send some messages to topics and receive a few of them. Then do redeploy and try to receive the rest.
+   //also this is a valid test for https://jira.jboss.org/jira/browse/JBMESSAGING-1774
+   public void testRedeployTopicNoMessageLoss2() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLoss2";
+      int numMsg = 500;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+         sendMessages(2, cTopic, msgBase+"cTopic", numMsg);
+         sendMessages(0, nTopic, msgBase+"nTopic", numMsg);
+         
+         //receive 10
+         conn.start();
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            assertEquals(msgBase + "cTopic" + i, rm.getText());
+            msgSet.remove(rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + "nTopic" + i, rm.getText());
+            msgSet.remove(rm.getText());
+         }
+         
+         conn.close();
+         
+         redeployDestinations(true);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn.start();
+
+         sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         boolean success = true;
+         for (int i = 10; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            if (rm == null)
+            {
+               success = false;
+            }
+            else
+            {
+               msgSet.remove(rm.getText());
+            }
+         }
+         
+         for (int i = 10; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub2.receive(5000);
+            log.info("--Message received: " + rm);
+            if (rm == null)
+            {
+               success = false;
+            }
+            else
+            {
+               msgSet.remove(rm.getText());
+            }
+         }
+         
+         if (!success)
+         {
+            log.info("=======test failed, missing messages: ");
+            Iterator<String> itmsg = msgSet.keySet().iterator();
+            while (itmsg.hasNext())
+            {
+               String key = itmsg.next();
+               TextMessage msg = msgSet.get(key);
+               log.info("=====> " + key + " <--> " + msg);
+            }
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess.unsubscribe("sub1");
+         sess.unsubscribe("sub2");
+         
+         assertTrue(success);
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+   public void testRedeployTopicNoMessageLossTX() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLossTX";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess1 = null;
+      Session sess2 = null;
+      
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess1 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         
+         MessageConsumer sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         //receive 10
+         conn.start();
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sess1.commit();
+         sess2.rollback();
+         
+         conn.close();
+         
+         redeployDestinations(true);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn.start();
+
+         sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess1.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 10; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess1.unsubscribe("sub1");
+         sess1.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   //send some messages to topics and receive a few within a tx, then redeply and receive them all.
+   public void testRedeployTopicNoMessageLossXA() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLossXA";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      XAConnection xaconn1 = null;
+      XASession xasess1 = null;
+      XAResource xres1 = null;
+      Session sess1 = null;
+
+      XAConnection xaconn2 = null;
+      XASession xasess2 = null;
+      XAResource xres2 = null;
+      Session sess2 = null;
+      
+      Xid xid1 = null;
+      Xid xid2 = null;
+      
+      Connection conn1 = null;
+      Connection conn2 = null;
+      
+      try
+      {
+         xaconn1 = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, 1);
+         xaconn1.setClientID("client-id-0");
+         xasess1 = xaconn1.createXASession();
+         xres1 = xasess1.getXAResource();
+         xaconn1.start();
+         sess1 = xasess1.getSession();
+
+         xaconn2 = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, 0);
+         xaconn2.setClientID("client-id-1");
+         xasess2 = xaconn2.createXASession();
+         xres2 = xasess2.getXAResource();
+         xaconn2.start();
+         sess2 = xasess2.getSession();
+
+         xid1 = new MessagingXid(("bq1" + cTopic).getBytes(), 42, cTopic.toString().getBytes());
+         xid2 = new MessagingXid(("bq1" + nTopic).getBytes(), 42, nTopic.toString().getBytes());
+
+         xres1.start(xid1, XAResource.TMNOFLAGS);
+         xres2.start(xid2, XAResource.TMNOFLAGS);
+         
+         MessageConsumer sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+                  
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         xres1.end(xid1, XAResource.TMSUCCESS);
+         xres2.end(xid2, XAResource.TMSUCCESS);
+
+         xres1.commit(xid1, true);
+         xres2.rollback(xid2);
+         
+         sub1.close();
+         sub2.close();
+         
+         xaconn1.close();
+         xaconn2.close();
+         
+         redeployDestinations(true);
+
+         conn1 = createConnectionOnServer(cf, 0);
+         conn1.setClientID("client-id-0");
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn2 = createConnectionOnServer(cf, 0);
+         conn2.setClientID("client-id-1");
+         sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn1.start();
+         conn2.start();
+
+         sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 10; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess1.unsubscribe("sub1");
+         sess2.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+   
+   //send some messages to queues and do redeploy, then receiving them.
+   public void testRedeployQueueNoMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+   
+   //send some messages to queues and do redeploy, dropping all messages
+   public void testRedeployQueueWithMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployQueueWithMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      redeployDestinations(false);
+      
+      checkEmpty(cQueue);
+      checkEmpty(nQueue);
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      sendMessages(1, nQueue, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few of them. Then do redeploy and try to receive the rest.
+   public void testRedeployQueueNoMessageLoss2() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLoss2";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+      receiveMessages(0, nQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+      
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 10, numMsg - 10, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+   
+   //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossTX() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossTX";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, false, "commit", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, false, "rollback", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "commit", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "rollback", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA2() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA2";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "prepared", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, false);
+      
+      recoverMessages(0, cQueue, msgBase, 0, 10);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA3() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA3";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, nQueue, msgBase, 0, 15, true, "prepared", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(2, nQueue, msgBase, 15, numMsg - 15, Session.CLIENT_ACKNOWLEDGE, true);
+      
+      //have to recover from node 0.
+      recoverMessages(0, nQueue, msgBase, 0, 15);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA4() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA4";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "noaction", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "noaction", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   /*
+    * Deploy the following destinations:
+    * 
+    * 1. clustered2NonclusteredQueue : a clustered queue used to be re-deployed as non-clustered.
+    * 2. nonclustered2ClusteredQueue : a non-clustered queue (at node0) to be re-deployed as clustered.
+    * 3. clustered2NonclusteredTopic : a clustered topic used to be re-deployed as non-clustered.
+    * 4. nonclustered2ClusteredTopic : a non-clustered topic (at node0) to be re-deployed as clustered.
+    * 
+    */
+   private void deployDestinations() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("clustered2NonclusteredQueue", i);
+         ServerManagement.deployTopic("clustered2NonclusteredTopic", i);
+      }
+      ServerManagement.deployQueue("nonclustered2ClusteredQueue");
+      ServerManagement.deployTopic("nonclustered2ClusteredTopic");
+
+      cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+      nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+      cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+      nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");
+
+      Queue anotherQueue = null;
+      try
+      {
+         anotherQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+         assertNotNull(anotherQueue);
+      }
+      catch (NamingException e)
+      {
+         fail("The queue " + anotherQueue + " should not exist after redeploy");
+      }
+      
+      Topic anotherTopic = null;
+      try
+      {
+         anotherTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+         assertNotNull(anotherTopic);
+      }
+      catch (NamingException e)
+      {
+         fail("The topic " + anotherTopic + " should not exist after redeploy");
+      }
+   }
+
+   private void redeployDestinations(boolean keepMessage) throws Exception
+   {
+      if (keepMessage)
+      {
+         redeployDestinationsWithMessage();
+      }
+      else
+      {
+         redeployDestinationsNoMessage();
+      }
+   }
+   
+   private void redeployDestinationsNoMessage() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.stop(i);
+      }
+      
+      //Restart nodes
+      for (int i = 0; i < nodeCount; i++)
+      {
+         startDefaultServer(i, overrides, false);
+         ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+      }
+      
+      //redeploy
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("nonclustered2ClusteredQueue", i, false);
+         ServerManagement.deployTopic("nonclustered2ClusteredTopic", i, false);
+      }
+      ServerManagement.deployQueue("clustered2NonclusteredQueue", false);
+      ServerManagement.deployTopic("clustered2NonclusteredTopic", false);      
+
+
+      cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+      nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+      cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+      nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");  
+      
+      cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+      try
+      {
+         Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+         fail("The queue " + nonExistQueue + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+      
+      try
+      {
+         Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+         fail("The topic " + nonExistTopic + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+
+
+   }
+   
+   private void redeployDestinationsWithMessage() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.stop(i);
+      }
+      
+      //Restart nodes
+      for (int i = 0; i < nodeCount; i++)
+      {
+         startDefaultServer(i, overrides, false);
+         ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+      }
+      
+      //redeploy
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("nonclustered2ClusteredQueue", i);
+         ServerManagement.deployTopic("nonclustered2ClusteredTopic", i);
+      }
+      ServerManagement.deployQueue("clustered2NonclusteredQueue");
+      ServerManagement.deployTopic("clustered2NonclusteredTopic");      
+
+
+      cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+      nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+      cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+      nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");  
+      
+      cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+      try
+      {
+         Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+         fail("The queue " + nonExistQueue + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+      
+      try
+      {
+         Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+         fail("The topic " + nonExistTopic + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+
+
+   }
+
+   private void sendMessages(int serverIndex, Destination dest, String msgBase, int numMsg) throws Exception
+   {
+      Connection conn = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, serverIndex);
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(dest);
+         log.info("-----Sending messages to: " + dest);
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage msg = sess.createTextMessage(msgBase + i);
+            producer.send(msg);
+            log.info("----message sent: " + msg.getText());
+            msgSet.put(msg.getText(), msg);
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+      finally
+      {
+         conn.close();
+      }
+   }
+
+   private void receiveMessages(int serverIndex, Destination dest, String msgBase, int startIndex,
+                                int numMsg, int ack, boolean checkEmpty) throws Exception
+   {
+      Connection conn = null;
+      
+      try
+      {
+         Session sess = null;
+
+         conn = createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(false, ack);
+         conn.start();
+         
+         MessageConsumer receiver = sess.createConsumer(dest);
+         TextMessage msg = null;
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            msg = (TextMessage)receiver.receive(5000);
+            assertEquals(msgBase + (startIndex + i), msg.getText());
+         }
+
+         if (ack == Session.CLIENT_ACKNOWLEDGE)
+         {
+            msg.acknowledge();
+         }
+         
+         if (checkEmpty)
+         {
+            if (dest instanceof Queue)
+            {
+               checkEmpty((Queue)dest);
+            }
+            else
+            {
+               checkEmpty((Topic)dest);
+            }
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   /*
+    * receive messages transactionally.
+    * 
+    * outcome values:
+    * 
+    * commit -- commit the transaction
+    * rollback -- rollback the transaction
+    * prepared -- parepared the transaction but not commit.
+    * 
+    */
+   private void receiveMessagesTX(int serverIndex, Destination dest, String msgBase, int startIndex, 
+                                  int numMsg, boolean isXA, String outcome, boolean checkEmpty) throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+      Session sess = null;
+      XASession xasess = null;
+      XAResource xres = null;
+      Xid xid = null;
+      try
+      {
+      if (isXA)
+      {
+         xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+         xasess = xaconn.createXASession();
+         xres = xasess.getXAResource();
+         xaconn.start();
+         sess = xasess.getSession();
+
+         xid = new MessagingXid(("bq1" + dest).getBytes(), 42, dest.toString().getBytes());
+
+         xres.start(xid, XAResource.TMNOFLAGS);
+      }
+      else
+      {
+         //local tx
+         conn = createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+         conn.start();
+      }
+      
+      //starting receiving
+      MessageConsumer cons = sess.createConsumer(dest);
+      for (int i = 0; i < numMsg; i++)
+      {
+         TextMessage rm = (TextMessage)cons.receive(5000);
+         assertEquals(msgBase + (i + startIndex), rm.getText());
+      }
+      
+      //ending
+      if (isXA)
+      {
+         xres.end(xid, XAResource.TMSUCCESS);
+         
+         if ("commit".equals(outcome))
+         {
+            //just one-phase is enough for the test
+            xres.commit(xid, true);
+         }
+         else if ("rollback".equals(outcome))
+         {
+            xres.rollback(xid);
+         }
+         else if ("prepared".equals(outcome))
+         {
+            xres.prepare(xid);
+         }
+      }
+      else
+      {
+         //local
+         if ("commit".equals(outcome))
+         {
+            sess.commit();
+         }
+         else if ("rollback".equals(outcome))
+         {
+            sess.rollback();
+         }
+      }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+      }
+   }
+
+   //recover the messages in transactions by rollback.
+   private void recoverMessages(int serverIndex, Destination dest, 
+                                String msgBase, int startIndex, int numMsg) throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+      Session sess = null;
+      XASession xasess = null;
+      XAResource xres = null;
+
+      try
+      {
+         xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+         xasess = xaconn.createXASession();
+         xres = xasess.getXAResource();
+         xaconn.start();
+         
+         Xid[] xids = xres.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = xres.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+         
+         conn = this.createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn.start();
+         
+         MessageConsumer cons = sess.createConsumer(dest);
+         TextMessage rm = (TextMessage)cons.receive(5000);
+         assertNull(rm);
+                  
+         xres.rollback(xids[0]);
+
+         conn.stop();
+         conn.start();
+         for (int i = 0; i < numMsg; i++)
+         {
+            rm = (TextMessage)cons.receive(5000);
+            assertEquals(msgBase + (startIndex + i), rm.getText());
+         }
+
+         if (dest instanceof Queue)
+         {
+            checkEmpty((Queue)dest);
+         }
+         else
+         {
+            checkEmpty((Topic)dest);
+         }
+      }
+      finally
+      {
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+
+}

Modified: branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2010-11-09 19:50:52 UTC (rev 8129)
+++ branches/Branch_JBossMessaging_1_4_6_GA_JBMESSAGING-1805_JBMESSAGING-1822_JBMESSAGING-1809/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2010-11-19 15:59:47 UTC (rev 8130)
@@ -392,6 +392,14 @@
       {
          return null;
       }
+
+      public void setClustered(boolean isClustered)
+      {
+      }
+
+      public void staticMerge(org.jboss.messaging.core.contract.Queue queue) throws Exception
+      {
+      }
    }
 
    // Inner classes -------------------------------------------------



More information about the jboss-cvs-commits mailing list