[hornetq-commits] JBoss hornetq SVN: r9389 - trunk/tests/jms-tests/src/org/hornetq/jms/tests.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 9 10:02:02 EDT 2010


Author: ataylor
Date: 2010-07-09 10:02:01 -0400 (Fri, 09 Jul 2010)
New Revision: 9389

Modified:
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
Log:
added test for http://community.jboss.org/message/551534#551534

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java	2010-07-09 13:59:30 UTC (rev 9388)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java	2010-07-09 14:02:01 UTC (rev 9389)
@@ -13,19 +13,19 @@
 
 package org.hornetq.jms.tests;
 
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
 
 import org.hornetq.jms.tests.util.ProxyAssertSupport;
 
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * $Id$
+ *         <p/>
+ *         $Id$
  */
 public class TransactedSessionTest extends JMSTestCase
 {
@@ -58,7 +58,7 @@
          Message m = c.receive(1000);
          ProxyAssertSupport.assertNotNull(m);
 
-         ProxyAssertSupport.assertEquals("one", ((TextMessage)m).getText());
+         ProxyAssertSupport.assertEquals("one", ((TextMessage) m).getText());
          ProxyAssertSupport.assertFalse(m.getJMSRedelivered());
          ProxyAssertSupport.assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
 
@@ -106,7 +106,7 @@
 
          conn.start();
 
-         TextMessage mRec1 = (TextMessage)consumer1.receive(2000);
+         TextMessage mRec1 = (TextMessage) consumer1.receive(2000);
          ProxyAssertSupport.assertNotNull(mRec1);
 
          ProxyAssertSupport.assertEquals("igloo", mRec1.getText());
@@ -114,7 +114,7 @@
 
          sess1.rollback(); // causes redelivery for session
 
-         mRec1 = (TextMessage)consumer1.receive(2000);
+         mRec1 = (TextMessage) consumer1.receive(2000);
          ProxyAssertSupport.assertEquals("igloo", mRec1.getText());
          ProxyAssertSupport.assertTrue(mRec1.getJMSRedelivered());
 
@@ -129,7 +129,9 @@
       }
    }
 
-   /** Test redelivery works ok for Topic */
+   /**
+    * Test redelivery works ok for Topic
+    */
    public void testRedeliveredTopic() throws Exception
    {
       Connection conn = null;
@@ -149,14 +151,14 @@
 
          sess.commit();
 
-         TextMessage mRec = (TextMessage)consumer.receive(2000);
+         TextMessage mRec = (TextMessage) consumer.receive(2000);
 
          ProxyAssertSupport.assertEquals("igloo", mRec.getText());
          ProxyAssertSupport.assertFalse(mRec.getJMSRedelivered());
 
          sess.rollback();
 
-         mRec = (TextMessage)consumer.receive(2000);
+         mRec = (TextMessage) consumer.receive(2000);
 
          ProxyAssertSupport.assertNotNull(mRec);
          ProxyAssertSupport.assertEquals("igloo", mRec.getText());
@@ -194,7 +196,7 @@
 
          sess.commit();
 
-         TextMessage mRec = (TextMessage)consumer.receive(2000);
+         TextMessage mRec = (TextMessage) consumer.receive(2000);
          ProxyAssertSupport.assertEquals("igloo", mRec.getText());
 
          sess.commit();
@@ -206,11 +208,11 @@
 
          sess.commit();
 
-         mRec = (TextMessage)consumer.receive(2000);
+         mRec = (TextMessage) consumer.receive(2000);
          ProxyAssertSupport.assertEquals("rollback", mRec.getText());
          sess.rollback();
 
-         TextMessage mRec2 = (TextMessage)consumer.receive(2000);
+         TextMessage mRec2 = (TextMessage) consumer.receive(2000);
 
          sess.commit();
 
@@ -394,6 +396,7 @@
     * Rollback the session.
     * Verify messages aren't received by consumer.
     */
+
    public void testSendRollbackTopic() throws Exception
    {
       Connection conn = null;
@@ -455,12 +458,12 @@
 
          sess.commit();
 
-         TextMessage mRec = (TextMessage)consumer.receive(2000);
+         TextMessage mRec = (TextMessage) consumer.receive(2000);
          ProxyAssertSupport.assertEquals("igloo", mRec.getText());
          ProxyAssertSupport.assertFalse(mRec.getJMSRedelivered());
 
          sess.rollback();
-         mRec = (TextMessage)consumer.receive(2000);
+         mRec = (TextMessage) consumer.receive(2000);
          ProxyAssertSupport.assertEquals("igloo", mRec.getText());
          ProxyAssertSupport.assertTrue(mRec.getJMSRedelivered());
 
@@ -503,7 +506,7 @@
 
          conn.start();
 
-         TextMessage tm = (TextMessage)cons.receive(1000);
+         TextMessage tm = (TextMessage) cons.receive(1000);
          ProxyAssertSupport.assertNotNull(tm);
 
          ProxyAssertSupport.assertEquals("a message", tm.getText());
@@ -519,7 +522,7 @@
 
          cons = sess2.createConsumer(HornetQServerTestCase.queue1);
 
-         tm = (TextMessage)cons.receive(1000);
+         tm = (TextMessage) cons.receive(1000);
 
          ProxyAssertSupport.assertEquals("a message", tm.getText());
 
@@ -552,7 +555,7 @@
 
       sess.commit();
 
-      TextMessage mRec = (TextMessage)consumer.receive(1000);
+      TextMessage mRec = (TextMessage) consumer.receive(1000);
       ProxyAssertSupport.assertNotNull(mRec);
       log.trace("Got 1");
       ProxyAssertSupport.assertNotNull(mRec);
@@ -566,7 +569,7 @@
       sess.commit();
 
       log.trace("Receiving 2");
-      mRec = (TextMessage)consumer.receive(1000);
+      mRec = (TextMessage) consumer.receive(1000);
       ProxyAssertSupport.assertNotNull(mRec);
 
       log.trace("Received 2");
@@ -575,7 +578,7 @@
 
       sess.rollback();
 
-      TextMessage mRec2 = (TextMessage)consumer.receive(1000);
+      TextMessage mRec2 = (TextMessage) consumer.receive(1000);
       ProxyAssertSupport.assertNotNull(mRec2);
       ProxyAssertSupport.assertEquals("rollback", mRec2.getText());
 
@@ -679,6 +682,126 @@
 
    }
 
+   public void _testSendCommitQueueCommitsInOrder() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         conn = JMSTestCase.cf.createConnection();
+
+         Session producerSess = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+         MessageProducer producer = producerSess.createProducer(HornetQServerTestCase.queue1);
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer consumer = consumerSession.createConsumer(HornetQServerTestCase.queue1);
+         CountDownLatch latch = new CountDownLatch(1);
+         conn.start();
+         myReceiver myReceiver = new myReceiver(latch, conn);
+         consumer.setMessageListener(myReceiver);
+         long lastBatchTime = System.currentTimeMillis();
+         int sentId = 0;
+         boolean started = false;
+         // Send some messages
+         while (true)
+         {
+            try
+            {
+               Message m = producerSess.createMessage();
+               m.setIntProperty("foo", sentId);
+               sentId++;
+               producer.send(m);
+
+               if (sentId == 1 || System.currentTimeMillis() - lastBatchTime > 50)
+               {
+                  lastBatchTime = System.currentTimeMillis();
+                  producerSess.commit();
+               }
+            }
+            catch (JMSException e)
+            {
+               //ignore connection closed by consumer
+            }
+
+            // wait for the first message to be received before we continue sending
+            if(!started)
+            {
+               assertTrue(latch.await(5, TimeUnit.SECONDS));
+               started = true;
+            }
+            else
+            {
+               if(myReceiver.failed)
+               {
+                  fail(myReceiver.e.getMessage());
+               }
+            }
+         }
+
+
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         removeAllMessages(HornetQServerTestCase.queue1.getQueueName(), true);
+      }
+
+   }
+
+   class myReceiver implements MessageListener
+   {
+      int count = 0;
+      boolean started = false;
+      private CountDownLatch startLatch;
+      boolean failed = false;
+      Exception e = null;
+
+      private Connection conn;
+
+      public myReceiver(CountDownLatch startLatch, Connection conn)
+      {
+         this.startLatch = startLatch;
+         this.conn = conn;
+      }
+
+      public void onMessage(Message message)
+      {
+         if(!started)
+         {
+             startLatch.countDown();
+            started = true;
+         }
+         try
+         {
+            int foo = message.getIntProperty("foo");
+            if(foo != count)
+            {
+               e = new Exception("received out of order expected " + count + " received " + foo);
+               failed = true;
+               conn.close();
+            }
+            count++;
+         }
+         catch (JMSException e)
+         {
+
+            this.e = e;
+            failed = true;
+            try
+            {
+               conn.close();
+            }
+            catch (JMSException e1)
+            {
+               e1.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+         }
+      }
+   }
    /**
     * Test IllegateStateException is thrown if commit is called on a non-transacted session
     */
@@ -719,7 +842,6 @@
     * Do not commit the receiving session.
     * Close the connection
     * Create a new connection, session and consumer - verify messages are redelivered
-    *
     */
    public void testAckNoCommitQueue() throws Exception
    {
@@ -870,6 +992,7 @@
     * Rollback the session.
     * Verify messages aren't received by consumer.
     */
+
    public void testSendRollbackQueue() throws Exception
    {
       Connection conn = null;
@@ -1033,6 +1156,7 @@
    /*
     * Send multiple messages in multiple contiguous sessions
     */
+
    public void testSendMultipleQueue() throws Exception
    {
       Connection conn = null;



More information about the hornetq-commits mailing list