[jboss-cvs] JBoss Messaging SVN: r2628 - trunk/tests/src/org/jboss/test/messaging/jms.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 2 16:10:23 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-05-02 16:10:22 -0400 (Wed, 02 May 2007)
New Revision: 2628

Modified:
   trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-950 - Adding a test without using any XASession..

I also refactored Listeners to a super class, just because I was going to add a new MessageListener using the same features as the other ones, but it was not needed to add it on the end.

Since this was working I kept the refactoring.

Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2007-05-02 19:48:27 UTC (rev 2627)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2007-05-02 20:10:22 UTC (rev 2628)
@@ -36,6 +36,10 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.jms.XAConnection;
+import javax.jms.Queue;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
 import javax.management.ObjectName;
 import javax.naming.InitialContext;
 
@@ -908,6 +912,76 @@
       conn.close();
       assertFalse(listener.failed);
    }
+
+   /*
+    *   This test will:
+    *     - Send two messages over a producer
+    *     - Receive one message over a consumer
+    *     - Call Recover
+    *     - Receive the second message
+    *     - The queue should be empty after that
+    *   Note: testMessageListenerAutoAck will test a similar case using MessageListeners
+    */
+   public void testRecoverAutoACK() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = cf.createConnection();
+         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer p = s.createProducer(queue);
+         p.setDeliveryMode(DeliveryMode.PERSISTENT);
+         Message m = s.createTextMessage("one");
+         p.send(m);
+         m = s.createTextMessage("two");
+         p.send(m);
+         conn.close();
+
+         conn = null;
+
+         assertRemainingMessages(2);
+
+         conn = cf.createConnection();
+
+         conn.start();
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         TextMessage messageReceived = (TextMessage)consumer.receive(1000);
+
+         assertNotNull(messageReceived);
+
+         assertEquals("one", messageReceived.getText());
+
+         session.recover();
+
+         messageReceived = (TextMessage)consumer.receive(1000);
+
+         assertEquals("two", messageReceived.getText());
+
+         consumer.close();
+
+         // I can't call xasession.close for this test as JCA layer would cache the session
+         // So.. keep this close commented!
+         //xasession.close();
+
+         assertRemainingMessages(0);
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         ServerManagement.undeployQueue("MyQueue2");
+      }
+   }
+
+
    
    public void testMessageListenerDupsOK() throws Exception
    {
@@ -1021,7 +1095,6 @@
       assertFalse(listener.failed);
    }
    
-   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
@@ -1029,30 +1102,41 @@
    // Private -------------------------------------------------------
    
    // Inner classes -------------------------------------------------
-   
-   
-   private class MessageListenerAutoAck implements MessageListener
+
+
+   private abstract class LatchListener implements MessageListener
    {
-      
-      private Latch latch = new Latch();
-      
-      private Session sess;
-      
-      private int count = 0;
-      
+      protected Latch latch = new Latch();
+
+      protected Session sess;
+
+      protected int count = 0;
+
       boolean failed;
-      
-      MessageListenerAutoAck(Session sess)
+
+      LatchListener(Session sess)
       {
          this.sess = sess;
       }
-      
+
       public void waitForMessages() throws InterruptedException
       {
          latch.acquire();
          Thread.sleep(500);
       }
 
+      public abstract void onMessage(Message m);
+      
+   }
+   
+   private class MessageListenerAutoAck extends LatchListener
+   {
+      
+      MessageListenerAutoAck(Session sess)
+      {
+         super(sess);
+      }
+      
       public void onMessage(Message m)
       {
          try
@@ -1118,27 +1202,13 @@
             
    }
    
-   private class MessageListenerDupsOK implements MessageListener
+   private class MessageListenerDupsOK  extends LatchListener
    {
       
-      private Latch latch = new Latch();
-      
-      private Session sess;
-      
-      private int count = 0;
-      
-      boolean failed;
-      
       MessageListenerDupsOK(Session sess)
       {
-         this.sess = sess;
+         super(sess);
       }
-      
-      public void waitForMessages() throws InterruptedException
-      {
-         latch.acquire();
-         Thread.sleep(500);
-      }
 
       public void onMessage(Message m)
       {
@@ -1206,28 +1276,14 @@
    }
    
    
-   private class MessageListenerClientAck implements MessageListener
+   private class MessageListenerClientAck extends LatchListener
    {
       
-      private Latch latch = new Latch();
-      
-      private Session sess;
-      
-      private int count = 0;
-      
-      boolean failed;
-      
       MessageListenerClientAck(Session sess)
       {
-         this.sess = sess;
+         super(sess);
       }
 
-      
-      public void waitForMessages() throws InterruptedException
-      {
-         latch.acquire();
-         Thread.sleep(500);
-      }
 
       public void onMessage(Message m)
       {
@@ -1327,31 +1383,15 @@
             
    }
    
-   private class MessageListenerTransactionalAck implements MessageListener
+   private class MessageListenerTransactionalAck extends LatchListener
    {
       
-      private Latch latch = new Latch();
-      
-      private Session sess;
-      
-      private int count = 0;
-      
-      boolean failed;
-      
       MessageListenerTransactionalAck(Session sess)
       {
-         this.sess = sess;
+         super(sess);
       }
 
       
-      public void waitForMessages() throws InterruptedException
-      {
-         latch.acquire();
-         
-         //Wait for postdeliver to be called
-         Thread.sleep(500);
-      }
-
       public void onMessage(Message m)
       {
          try




More information about the jboss-cvs-commits mailing list