[jboss-cvs] JBossAS SVN: r57866 - trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 26 17:09:56 EDT 2006


Author: rachmatowicz at jboss.com
Date: 2006-10-26 17:09:55 -0400 (Thu, 26 Oct 2006)
New Revision: 57866

Added:
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/TemporarySessionConnectionUnitTestCase.java
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/UnackedUnitTestCase.java
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/XAResourceUnitTestCase.java
Log:
Further refactored generic JMS test cases

Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/TemporarySessionConnectionUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/TemporarySessionConnectionUnitTestCase.java	2006-10-26 20:31:45 UTC (rev 57865)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/TemporarySessionConnectionUnitTestCase.java	2006-10-26 21:09:55 UTC (rev 57866)
@@ -0,0 +1,119 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.jbossmessaging.test;
+
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.naming.Context;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+
+/**
+ * Tests for temporaries and session/connection consumer construction
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author <a href="mailto:adrian at jboss.org>Adrian Brock</a>
+ * @version <tt>$Revision: 37406 $</tt>
+ */
+public class TemporarySessionConnectionUnitTestCase extends JMSTestCase
+{
+   static String QUEUE_FACTORY = "ConnectionFactory";
+
+   QueueConnection queueConnection;
+
+   public TemporarySessionConnectionUnitTestCase(String name) throws Exception
+   {
+      super(name);
+   }
+
+   public void testTemporaryDifferentSession() throws Exception
+   {
+      connect();
+      try
+      {
+         QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+         TemporaryQueue temp = session.createTemporaryQueue(); 
+         session.createConsumer(temp);
+         session.close();
+         session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+         session.createConsumer(temp).close();
+      }
+      finally
+      {
+         disconnect();
+      }
+   }
+
+   public void testTemporaryDifferentConnection() throws Exception
+   {
+      connect();
+      try
+      {
+         QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+         TemporaryQueue temp = session.createTemporaryQueue(); 
+         session.createConsumer(temp);
+         disconnect();
+         connect();
+         session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+         try
+         {
+            session.createConsumer(temp).close();
+            fail("Should not be able to consume a temporary on different connection");
+         }
+         catch (JMSException expected)
+         {
+         }
+      }
+      finally
+      {
+         disconnect();
+      }
+   }
+
+   protected void connect() throws Exception
+   {
+      Context context = getInitialContext();
+      QueueConnectionFactory queueFactory = (QueueConnectionFactory) context.lookup(QUEUE_FACTORY);
+      queueConnection = queueFactory.createQueueConnection();
+      queueConnection.start();
+
+      getLog().debug("Connection established.");
+   }
+
+   protected void disconnect()
+   {
+      try
+      {
+         if (queueConnection != null)
+            queueConnection.close();
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      getLog().debug("Connection closed.");
+   }
+}

Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/UnackedUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/UnackedUnitTestCase.java	2006-10-26 20:31:45 UTC (rev 57865)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/UnackedUnitTestCase.java	2006-10-26 21:09:55 UTC (rev 57866)
@@ -0,0 +1,690 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.jbossmessaging.test;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.naming.Context;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Category;
+import org.jboss.test.jbossmessaging.JMSTestCase;
+
+/**
+ * Rollback tests
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version $Revision: 56718 $
+ */
+public class UnackedUnitTestCase extends JMSTestCase
+{
+   // Provider specific
+   static String TOPIC_FACTORY = "ConnectionFactory";
+   static String QUEUE_FACTORY = "ConnectionFactory";
+
+   static String TEST_QUEUE = "queue/testQueue";
+   static String TEST_TOPIC = "topic/testTopic";
+   static String TEST_DURABLE_TOPIC = "topic/testDurableTopic";
+
+   static byte[] PAYLOAD = new byte[10];
+
+   static Context context;
+   static QueueConnection queueConnection;
+   static TopicConnection topicConnection;
+   static TopicConnection topicDurableConnection;
+
+   public static Test suite() throws Exception
+   {
+      // JBAS-3580, the execution order of tests in this test case is important
+      // so it must be defined explicitly when running under some JVMs
+      TestSuite suite = new TestSuite();
+      suite.addTest(new UnackedUnitTestCase("testUnackedQueue"));
+      suite.addTest(new UnackedUnitTestCase("testUnackedMultipleSession"));
+      suite.addTest(new UnackedUnitTestCase("testUnackedMultipleConnection"));
+      suite.addTest(new UnackedUnitTestCase("testUnackedTopic"));
+      suite.addTest(new UnackedUnitTestCase("testUnackedDurableTopic"));
+      suite.addTest(new UnackedUnitTestCase("testDummyLast"));
+      
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+      String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
+      String module = loader.getResource(resourceName).toString();
+
+      return getDeploySetup(suite, module);
+   }
+   
+   /**
+    * Constructor the test
+    *
+    * @param name           Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public UnackedUnitTestCase(String name) throws Exception
+   {
+      super(name);
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runUnackedQueue(final int persistence) throws Exception
+   {
+      drainQueue();
+
+      final int iterationCount = getIterationCount();
+
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueSender sender = session.createSender(queue);
+
+      Message message = session.createBytesMessage();
+      ((BytesMessage)message).writeBytes(PAYLOAD);
+
+      for (int i = 0; i < iterationCount; i++)
+         sender.send(message, persistence, 4, 0);
+
+      session.close();
+
+      session = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+      queue = (Queue)context.lookup(TEST_QUEUE);
+      QueueReceiver receiver = session.createReceiver(queue);
+      queueConnection.start();
+      message = receiver.receive(50);
+      int c = 0;
+      while (message != null)
+      {
+         message = receiver.receive(50);
+         c++;
+      }
+      assertTrue("Should have received all data unacked", c == iterationCount);
+
+      queueConnection.close();
+      QueueConnectionFactory queueFactory = (QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+      queueConnection = queueFactory.createQueueConnection();
+
+      assertTrue("Queue should be full", drainQueue() == iterationCount);
+
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runUnackedMultipleSession(final int persistence) throws Exception
+   {
+      drainQueue();
+
+      final int iterationCount = getIterationCount();
+
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueSender sender = session.createSender(queue);
+
+      Message message = session.createBytesMessage();
+      ((BytesMessage)message).writeBytes(PAYLOAD);
+
+      for (int i = 0; i < iterationCount; i++)
+         sender.send(message, persistence, 4, 0);
+
+      session.close();
+
+      QueueSession session1 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+      queue = (Queue)context.lookup(TEST_QUEUE);
+      QueueReceiver receiver1 = session1.createReceiver(queue);
+      QueueSession session2 = queueConnection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+      QueueReceiver receiver2 = session2.createReceiver(queue);
+      queueConnection.start();
+
+      // Read half from session1
+      int c = 0;
+      for (int l = 0; l < iterationCount/2; l++)
+      {
+         message = receiver1.receive(50);
+         if (message != null)
+            c++;
+      }
+      assertTrue("Should have received half data unacked", c == iterationCount/2);
+
+      // Read the rest from session2
+      c = 0;
+      Message lastMessage = null;
+      while (message != null)
+      {
+         message = receiver2.receive(50);
+         if (message != null)
+         {
+            c++;
+            lastMessage = message;
+         }
+      }
+      assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2);
+
+      // Close session1, the messages are unacked and should go back in the queue
+      session1.close();
+
+      // Acknowledge messages on session2 and close it
+      lastMessage.acknowledge();
+      session2.close();
+
+      queueConnection.stop();
+
+      assertTrue("Session1 messages should be available", drainQueue() == iterationCount/2);
+
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runUnackedMultipleConnection(final int persistence) throws Exception
+   {
+      drainQueue();
+
+      final int iterationCount = getIterationCount();
+
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueSender sender = session.createSender(queue);
+
+      Message message = session.createBytesMessage();
+      ((BytesMessage)message).writeBytes(PAYLOAD);
+
+      for (int i = 0; i < iterationCount; i++)
+         sender.send(message, persistence, 4, 0);
+
+      session.close();
+
+      QueueConnectionFactory queueFactory = (QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+      QueueConnection queueConnection1 = queueFactory.createQueueConnection();
+      QueueSession session1 = queueConnection1.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+      queue = (Queue)context.lookup(TEST_QUEUE);
+      QueueReceiver receiver1 = session1.createReceiver(queue);
+
+      QueueConnection queueConnection2 = queueFactory.createQueueConnection();
+      QueueSession session2 = queueConnection2.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+      QueueReceiver receiver2 = session2.createReceiver(queue);
+
+      queueConnection1.start();
+      queueConnection2.start();
+
+      // Read half from session1
+      int c = 0;
+      for (int l = 0; l < iterationCount/2; l++)
+      {
+         message = receiver1.receive(50);
+         if (message != null)
+            c++;
+      }
+      assertTrue("Should have received half data unacked", c == iterationCount/2);
+
+      // Read the rest from session2
+      Message lastMessage = null;
+      c = 0;
+      while (message != null)
+      {
+         message = receiver2.receive(50);
+         if (message != null)
+         {
+            c++;
+            lastMessage = message;
+         }
+      }
+      assertTrue("Should have received all data unacked", c == iterationCount - iterationCount/2);
+
+      // Close session1, the messages are unacked and should go back in the queue
+      queueConnection1.close();
+
+      // Acknowledge messages for connection 2 and close it
+      lastMessage.acknowledge();
+      queueConnection2.close();
+
+      assertTrue("Connection1 messages should be available", drainQueue() == iterationCount/2);
+
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runUnackedTopic(final int persistence) throws Exception
+   {
+      drainQueue();
+      drainTopic();
+
+      final int iterationCount = getIterationCount();
+      final Category log = getLog();
+
+      Thread sendThread =
+         new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+
+                  TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                  Topic topic = (Topic)context.lookup(TEST_TOPIC);
+
+                  TopicPublisher publisher = session.createPublisher(topic);
+
+                  waitForSynchMessage();
+
+                  BytesMessage message = session.createBytesMessage();
+                  message.writeBytes(PAYLOAD);
+
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     publisher.publish(message, persistence, 4, 0);
+                  }
+
+                  session.close();
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      TopicSession session = topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
+      Topic topic = (Topic)context.lookup(TEST_TOPIC);
+      TopicSubscriber subscriber = session.createSubscriber(topic);
+
+
+      MyMessageListener listener = new MyMessageListener(iterationCount, log);
+
+      queueConnection.start();
+      sendThread.start();
+      subscriber.setMessageListener(listener);
+      topicConnection.start();
+      sendSynchMessage();
+      synchronized (listener)
+      {
+         if (listener.i < iterationCount)
+            listener.wait();
+      }
+      sendThread.join();
+      topicConnection.close();
+      TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+      topicConnection = topicFactory.createTopicConnection();
+      queueConnection.stop();
+      assertTrue("Topic should be empty", drainTopic() == 0);
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runUnackedDurableTopic(final int persistence) throws Exception
+   {
+      drainQueue();
+      drainDurableTopic();
+
+      final int iterationCount = getIterationCount();
+      final Category log = getLog();
+
+      Thread sendThread =
+         new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+
+                  TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+                  Topic topic = (Topic)context.lookup(TEST_DURABLE_TOPIC);
+
+                  TopicPublisher publisher = session.createPublisher(topic);
+
+                  waitForSynchMessage();
+
+                  BytesMessage message = session.createBytesMessage();
+                  message.writeBytes(PAYLOAD);
+
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     publisher.publish(message, persistence, 4, 0);
+                  }
+
+                  session.close();
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      TopicSession session = topicDurableConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
+      Topic topic = (Topic)context.lookup(TEST_DURABLE_TOPIC);
+      TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test");
+
+      MyMessageListener listener = new MyMessageListener(iterationCount, log);
+
+      queueConnection.start();
+      sendThread.start();
+      subscriber.setMessageListener(listener);
+      topicDurableConnection.start();
+      sendSynchMessage();
+      synchronized (listener)
+      {
+         if (listener.i < iterationCount)
+            listener.wait();
+      }
+
+      sendThread.join();
+      topicDurableConnection.close();
+      TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+      topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
+      queueConnection.stop();
+      assertTrue("Topic should be full", drainDurableTopic() == iterationCount);
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testUnackedQueue() throws Exception
+   {
+
+      getLog().debug("Starting UnackedQueue test");
+
+      runUnackedQueue(DeliveryMode.NON_PERSISTENT);
+      runUnackedQueue(DeliveryMode.PERSISTENT);
+
+      getLog().debug("UnackedQueue passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testUnackedMultipleSession() throws Exception
+   {
+
+      getLog().debug("Starting UnackedMultipleSession test");
+
+      runUnackedMultipleSession(DeliveryMode.NON_PERSISTENT);
+      runUnackedMultipleSession(DeliveryMode.PERSISTENT);
+
+      getLog().debug("UnackedMultipleSession passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testUnackedMultipleConnection() throws Exception
+   {
+
+      getLog().debug("Starting UnackedMultipleConnection test");
+
+      runUnackedMultipleConnection(DeliveryMode.NON_PERSISTENT);
+      runUnackedMultipleConnection(DeliveryMode.PERSISTENT);
+
+      getLog().debug("UnackedMultipleConnection passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testUnackedTopic() throws Exception
+   {
+
+      getLog().debug("Starting UnackedTopic test");
+
+      runUnackedTopic(DeliveryMode.NON_PERSISTENT);
+      runUnackedTopic(DeliveryMode.PERSISTENT);
+
+      getLog().debug("UnackedTopic passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testUnackedDurableTopic() throws Exception
+   {
+
+      getLog().debug("Starting UnackedDurableTopic test");
+
+      runUnackedDurableTopic(DeliveryMode.NON_PERSISTENT);
+      runUnackedDurableTopic(DeliveryMode.PERSISTENT);
+
+      getLog().debug("UnackedDurableTopic passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testDummyLast() throws Exception
+   {
+
+      TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      session.unsubscribe("test");
+
+      queueConnection.close();
+      topicConnection.close();
+      topicDurableConnection.close();
+   }
+
+   /**
+    * The JUnit setup method
+    *
+    * @exception Exception  Description of Exception
+    */
+   protected void setUp() throws Exception
+   {
+      if (context == null)
+      {
+         context = getInitialContext();
+
+         QueueConnectionFactory queueFactory = (QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+         queueConnection = queueFactory.createQueueConnection();
+
+         TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+         topicConnection = topicFactory.createTopicConnection();
+         topicDurableConnection = topicFactory.createTopicConnection("john", "needle");
+
+         getLog().debug("Connection to JBossMQ established.");
+      }
+   }
+
+   // Emptys out all the messages in a queue
+   private int drainQueue() throws Exception
+   {
+      getLog().debug("Draining Queue");
+      queueConnection.start();
+
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueReceiver receiver = session.createReceiver(queue);
+      Message message = receiver.receive(1000);
+      int c = 0;
+      while (message != null)
+      {
+         message = receiver.receive(1000);
+         c++;
+      }
+
+      getLog().debug("  Drained " + c + " messages from the queue");
+
+      session.close();
+
+      queueConnection.stop();
+
+      return c;
+   }
+
+   // Emptys out all the messages in a topic
+   private int drainTopic() throws Exception
+   {
+      getLog().debug("Draining Topic");
+      topicConnection.start();
+
+      final TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic topic = (Topic)context.lookup(TEST_TOPIC);
+      TopicSubscriber subscriber = session.createSubscriber(topic);
+
+      Message message = subscriber.receive(1000);
+      int c = 0;
+      while (message != null)
+      {
+         message = subscriber.receive(1000);
+         c++;
+      }
+
+      getLog().debug("  Drained " + c + " messages from the topic");
+
+      session.close();
+
+      topicConnection.stop();
+
+      return c;
+   }
+
+   // Emptys out all the messages in a durable topic
+   private int drainDurableTopic() throws Exception
+   {
+      getLog().debug("Draining Durable Topic");
+      topicDurableConnection.start();
+
+      final TopicSession session = topicDurableConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic topic = (Topic)context.lookup(TEST_DURABLE_TOPIC);
+      TopicSubscriber subscriber = session.createDurableSubscriber(topic, "test");
+
+      Message message = subscriber.receive(1000);
+      int c = 0;
+      while (message != null)
+      {
+         message = subscriber.receive(1000);
+         c++;
+      }
+
+      getLog().debug("  Drained " + c + " messages from the durable topic");
+
+      session.close();
+
+      topicDurableConnection.stop();
+
+      return c;
+   }
+
+   private void waitForSynchMessage() throws Exception
+   {
+      getLog().debug("Waiting for Synch Message");
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueReceiver receiver = session.createReceiver(queue);
+      receiver.receive();
+      session.close();
+      getLog().debug("Got Synch Message");
+   }
+
+   private void sendSynchMessage() throws Exception
+   {
+      getLog().debug("Sending Synch Message");
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueSender sender = session.createSender(queue);
+
+      Message message = session.createMessage();
+      sender.send(message);
+
+      session.close();
+      getLog().debug("Sent Synch Message");
+   }
+
+   public class MyMessageListener
+      implements MessageListener
+   {
+      public int i = 0;
+
+      public int iterationCount;
+
+      public Category log;
+
+      public MyMessageListener(int iterationCount, Category log)
+      {
+         this.iterationCount = iterationCount;
+         this.log = log; 
+      }
+
+      public void onMessage(Message message)
+      {
+         synchronized (this)
+         {
+            i++;
+            log.debug("Got message " + i);
+            if (i >= iterationCount)
+               this.notify();
+         }
+      }
+   }
+
+   public int getIterationCount()
+   {
+      return 5;
+   }
+}

Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/XAResourceUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/XAResourceUnitTestCase.java	2006-10-26 20:31:45 UTC (rev 57865)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/test/XAResourceUnitTestCase.java	2006-10-26 21:09:55 UTC (rev 57866)
@@ -0,0 +1,190 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.jbossmessaging.test;
+
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.XATopicConnection;
+import javax.jms.XATopicConnectionFactory;
+import javax.jms.XATopicSession;
+import javax.naming.InitialContext;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+
+/**
+ * XAResource tests
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version
+ */
+public class XAResourceUnitTestCase extends JMSTestCase
+{
+   static String XA_TOPIC_FACTORY = "XAConnectionFactory";
+
+   static String TEST_TOPIC = "topic/testTopic";
+
+   public XAResourceUnitTestCase(String name) throws Exception
+   {
+      super(name);
+   }
+
+   public void testXAResourceSuspendWorkCommit() throws Exception
+   {
+      InitialContext context = getInitialContext();
+      XATopicConnectionFactory factory = (XATopicConnectionFactory) context.lookup(XA_TOPIC_FACTORY);
+      Topic topic = (Topic) context.lookup(TEST_TOPIC);
+
+      XATopicConnection connection = factory.createXATopicConnection();
+      try
+      {
+         // Set up
+         XATopicSession xaSession = connection.createXATopicSession();
+         TopicSession session = xaSession.getTopicSession();
+         TopicPublisher publisher = session.createPublisher(topic);
+         Message message = session.createTextMessage();
+
+         // Add the xa resource to xid1
+         MyXid xid1 = new MyXid();
+         XAResource resource = xaSession.getXAResource();
+         resource.start(xid1, XAResource.TMNOFLAGS);
+
+         // Do some work
+         publisher.publish(message);
+
+         // Suspend the transaction
+         resource.end(xid1, XAResource.TMSUSPEND);
+
+         // Add the xa resource to xid2
+         MyXid xid2 = new MyXid();
+         resource.start(xid2, XAResource.TMNOFLAGS);
+
+         // Do some work in the new transaction
+         publisher.publish(message);
+
+         // Commit the first transaction and end the branch
+         resource.end(xid1, XAResource.TMSUCCESS);
+         resource.commit(xid1, true);
+
+         // Do some more work in the new transaction
+         publisher.publish(message);
+
+         // Commit the second transaction and end the branch
+         resource.end(xid2, XAResource.TMSUCCESS);
+         resource.commit(xid2, true);
+      }
+      finally
+      {
+         connection.close();
+      }
+   }
+
+   public void testXAResourceRollbackAfterPrepare() throws Exception
+   {
+      InitialContext context = getInitialContext();
+      XATopicConnectionFactory factory = (XATopicConnectionFactory) context.lookup(XA_TOPIC_FACTORY);
+      Topic topic = (Topic) context.lookup(TEST_TOPIC);
+
+      XATopicConnection connection = factory.createXATopicConnection();
+      try
+      {
+         // Set up
+         XATopicSession xaSession = connection.createXATopicSession();
+         TopicSession session = xaSession.getTopicSession();
+         TopicSubscriber subscriber = session.createSubscriber(topic);
+         connection.start();
+         TopicPublisher publisher = session.createPublisher(topic);
+         Message message = session.createTextMessage();
+
+         // Publish a message using "AutoAcknowledge"
+         publisher.publish(message);
+
+         // Add the xa resource to xid1
+         MyXid xid1 = new MyXid();
+         XAResource resource = xaSession.getXAResource();
+         resource.start(xid1, XAResource.TMNOFLAGS);
+
+         // Receive the message
+         message = subscriber.receiveNoWait();
+         if (message == null)
+            fail("No message?");
+
+         // Prepare the transaction
+         resource.end(xid1, XAResource.TMSUCCESS);
+         resource.prepare(xid1);
+         
+         // Rollback
+         resource.rollback(xid1);
+         
+         // Receive the message using "AutoAcknowledge"
+         message = subscriber.receiveNoWait();
+         if (message == null)
+            fail("No message after rollback?");
+      }
+      finally
+      {
+         connection.close();
+      }
+   }
+
+   public static junit.framework.Test suite() throws Exception
+   {
+       ClassLoader loader = Thread.currentThread().getContextClassLoader();
+       String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
+
+       return getDeploySetup(XAResourceUnitTestCase.class,
+               loader.getResource(resourceName).toString());
+   }
+
+   public static class MyXid
+      implements Xid
+   {
+      static byte next = 0;
+
+      byte[] xid;
+
+      public MyXid()
+      {
+         xid = new byte[] { ++next };
+      }
+ 
+      public int getFormatId()
+      {
+         return 314;
+      }
+
+      public byte[] getGlobalTransactionId()
+      {
+         return xid;
+      }
+
+      public byte[] getBranchQualifier()
+      {
+         return null;
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list