[jboss-cvs] JBoss Messaging SVN: r7864 - branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 23 03:17:51 EDT 2009


Author: gaohoward
Date: 2009-10-23 03:17:50 -0400 (Fri, 23 Oct 2009)
New Revision: 7864

Added:
   branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Log:
add test


Added: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	                        (rev 0)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-23 07:17:50 UTC (rev 7864)
@@ -0,0 +1,282 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.jboss.test.messaging.jms.clustering.XAFailoverTest.DummyXAResource;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
+
+/**
+ * Test for https://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class DestinationRedeployTest extends ClusteringTestBase
+{
+   //clustered2NonclusteredQueue
+   private Queue cQueue;
+   
+   //nonclustered2ClusteredQueue
+   private Queue nQueue;
+   
+   //clustered2NonclusteredTopic
+   private Topic cTopic;
+   
+   //nonclustered2ClusteredTopic
+   private Topic nTopic;
+
+   private ServiceContainer sc;
+   
+   private TransactionManager tm;
+   
+   private Transaction suspended;
+   
+   public DestinationRedeployTest(String name)
+   {
+      super(name);
+   }
+   
+   protected void setUp() throws Exception
+   {
+      sc = new ServiceContainer("transaction");
+      
+      //Don't drop the tables again!
+      sc.start(false);
+   
+      InitialContext localIc = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
+           
+      tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
+      
+      suspended = tm.suspend();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      sc.stop();
+      
+      if (suspended != null)
+      {
+         tm.resume(suspended);
+      }
+   }
+
+   public void testRedeploy0() throws Exception
+   {
+      String msgBase = "testRedeploy0";
+      int numMsg = 50;
+
+      deployDestinations();
+      redeployDestinations(false);
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      sendMessages(1, nQueue, msgBase, numMsg);
+      sendMessages(0, cTopic, msgBase, numMsg);
+      sendMessages(1, nTopic, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, null, Session.AUTO_ACKNOWLEDGE, false);
+      receiveMessages(2, cQueue, msgBase, 0, numMsg, null, Session.CLIENT_ACKNOWLEDGE, false);
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, null, Session.CLIENT_ACKNOWLEDGE, false);
+      receiveMessages(3, cQueue, msgBase, 0, numMsg, null, Session.AUTO_ACKNOWLEDGE, false);
+      
+   }
+   
+   /*
+    * Deploy the following destinations:
+    * 
+    * 1. clustered2NonclusteredQueue : a clustered queue used to be re-deployed as non-clustered.
+    * 2. nonclustered2ClusteredQueue : a non-clustered queue (at node0) to be re-deployed as clustered.
+    * 3. clustered2NonclusteredTopic : a clustered topic used to be re-deployed as non-clustered.
+    * 4. nonclustered2ClusteredTopic : a non-clustered topic (at node0) to be re-deployed as clustered.
+    * 
+    */
+   private void deployDestinations() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("clustered2NonclusteredQueue", i);
+         ServerManagement.deployTopic("clustered2NonclusteredTopic", i);
+      }
+      ServerManagement.deployQueue("nonclustered2ClusteredQueue");
+      ServerManagement.deployTopic("nonclustered2ClusteredTopic");
+      
+   }
+
+   private void redeployDestinations(boolean keepMessage) throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.stop(i);
+      }
+      
+      //Restart nodes
+      for (int i = 0; i < nodeCount; i++)
+      {
+         startDefaultServer(i, overrides, i == 0);
+      }
+      
+      //redeploy
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("nonclustered2ClusteredQueue", i);
+         ServerManagement.deployTopic("nonclustered2ClusteredTopic", i);
+      }
+      ServerManagement.deployQueue("clustered2NonclusteredQueue");
+      ServerManagement.deployTopic("clustered2NonclusteredTopic");      
+   }
+
+   private void sendMessages(int serverIndex, Destination dest, String msgBase, int numMsg) throws Exception
+   {
+      Connection conn = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, serverIndex);
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(dest);
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage msg = sess.createTextMessage(msgBase + i);
+            producer.send(msg);
+         }
+      }
+      finally
+      {
+         conn.close();
+      }
+   }
+
+   private void receiveMessages(int serverIndex, Destination dest, String msgBase, int startIndex,
+                                int numMsg, Boolean isXA, int ack, boolean checkEmpty) throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+      
+      try
+      {
+         Session sess = null;
+         XASession xasess = null;
+         XAResource res = null;
+         
+         if (isXA == null)
+         {
+            //no tx
+            conn = createConnectionOnServer(cf, serverIndex);
+            sess = conn.createSession(false, ack);
+            conn.start();
+         }
+         else if (isXA.booleanValue())
+         {
+            //xa
+            xaconn = createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+            xasess = xaconn.createXASession();
+            res = xasess.getXAResource();
+            sess = xasess.getSession();
+            
+            xaconn.start();
+         }
+         else
+         {
+            //local tx
+            conn = createConnectionOnServer(cf, serverIndex);
+            sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+            conn.start();
+         }
+
+         if (isXA.booleanValue())
+         {
+            tm.begin();
+            
+            Transaction tx = tm.getTransaction();
+            
+            tx.enlistResource(res);
+            
+            //Enlist a dummy XAResource to force 2pc
+            XAResource dummy = new DummyXAResource();        
+            
+            tx.enlistResource(dummy);
+         }
+         
+         MessageConsumer receiver = sess.createConsumer(dest);
+         TextMessage msg = null;
+         for (int i = 0; i < numMsg; i++)
+         {
+            msg = (TextMessage)receiver.receive(5000);
+            assertEquals(msgBase + (startIndex + i), msg.getText());
+         }
+         
+         if (isXA == null)
+         {
+            if (ack == Session.CLIENT_ACKNOWLEDGE)
+            {
+               msg.acknowledge();
+            }
+         }
+         else if (isXA.booleanValue())
+         {
+            //xa commit
+            tm.commit();
+         }
+         else
+         {
+            sess.commit();
+         }
+         
+         if (checkEmpty)
+         {
+            if (dest instanceof Queue)
+            {
+               checkEmpty((Queue)dest);
+            }
+            else
+            {
+               checkEmpty((Topic)dest);
+            }
+         }
+      }
+      finally
+      {
+         conn.close();
+         xaconn.close();
+      }      
+   }
+}




More information about the jboss-cvs-commits mailing list