[hornetq-commits] JBoss hornetq SVN: r9389 - trunk/tests/jms-tests/src/org/hornetq/jms/tests.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Jul 9 10:02:02 EDT 2010
Author: ataylor
Date: 2010-07-09 10:02:01 -0400 (Fri, 09 Jul 2010)
New Revision: 9389
Modified:
trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
Log:
added test for http://community.jboss.org/message/551534#551534
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2010-07-09 13:59:30 UTC (rev 9388)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/TransactedSessionTest.java 2010-07-09 14:02:01 UTC (rev 9389)
@@ -13,19 +13,19 @@
package org.hornetq.jms.tests;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
import org.hornetq.jms.tests.util.ProxyAssertSupport;
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * $Id$
+ * <p/>
+ * $Id$
*/
public class TransactedSessionTest extends JMSTestCase
{
@@ -58,7 +58,7 @@
Message m = c.receive(1000);
ProxyAssertSupport.assertNotNull(m);
- ProxyAssertSupport.assertEquals("one", ((TextMessage)m).getText());
+ ProxyAssertSupport.assertEquals("one", ((TextMessage) m).getText());
ProxyAssertSupport.assertFalse(m.getJMSRedelivered());
ProxyAssertSupport.assertEquals(1, m.getIntProperty("JMSXDeliveryCount"));
@@ -106,7 +106,7 @@
conn.start();
- TextMessage mRec1 = (TextMessage)consumer1.receive(2000);
+ TextMessage mRec1 = (TextMessage) consumer1.receive(2000);
ProxyAssertSupport.assertNotNull(mRec1);
ProxyAssertSupport.assertEquals("igloo", mRec1.getText());
@@ -114,7 +114,7 @@
sess1.rollback(); // causes redelivery for session
- mRec1 = (TextMessage)consumer1.receive(2000);
+ mRec1 = (TextMessage) consumer1.receive(2000);
ProxyAssertSupport.assertEquals("igloo", mRec1.getText());
ProxyAssertSupport.assertTrue(mRec1.getJMSRedelivered());
@@ -129,7 +129,9 @@
}
}
- /** Test redelivery works ok for Topic */
+ /**
+ * Test redelivery works ok for Topic
+ */
public void testRedeliveredTopic() throws Exception
{
Connection conn = null;
@@ -149,14 +151,14 @@
sess.commit();
- TextMessage mRec = (TextMessage)consumer.receive(2000);
+ TextMessage mRec = (TextMessage) consumer.receive(2000);
ProxyAssertSupport.assertEquals("igloo", mRec.getText());
ProxyAssertSupport.assertFalse(mRec.getJMSRedelivered());
sess.rollback();
- mRec = (TextMessage)consumer.receive(2000);
+ mRec = (TextMessage) consumer.receive(2000);
ProxyAssertSupport.assertNotNull(mRec);
ProxyAssertSupport.assertEquals("igloo", mRec.getText());
@@ -194,7 +196,7 @@
sess.commit();
- TextMessage mRec = (TextMessage)consumer.receive(2000);
+ TextMessage mRec = (TextMessage) consumer.receive(2000);
ProxyAssertSupport.assertEquals("igloo", mRec.getText());
sess.commit();
@@ -206,11 +208,11 @@
sess.commit();
- mRec = (TextMessage)consumer.receive(2000);
+ mRec = (TextMessage) consumer.receive(2000);
ProxyAssertSupport.assertEquals("rollback", mRec.getText());
sess.rollback();
- TextMessage mRec2 = (TextMessage)consumer.receive(2000);
+ TextMessage mRec2 = (TextMessage) consumer.receive(2000);
sess.commit();
@@ -394,6 +396,7 @@
* Rollback the session.
* Verify messages aren't received by consumer.
*/
+
public void testSendRollbackTopic() throws Exception
{
Connection conn = null;
@@ -455,12 +458,12 @@
sess.commit();
- TextMessage mRec = (TextMessage)consumer.receive(2000);
+ TextMessage mRec = (TextMessage) consumer.receive(2000);
ProxyAssertSupport.assertEquals("igloo", mRec.getText());
ProxyAssertSupport.assertFalse(mRec.getJMSRedelivered());
sess.rollback();
- mRec = (TextMessage)consumer.receive(2000);
+ mRec = (TextMessage) consumer.receive(2000);
ProxyAssertSupport.assertEquals("igloo", mRec.getText());
ProxyAssertSupport.assertTrue(mRec.getJMSRedelivered());
@@ -503,7 +506,7 @@
conn.start();
- TextMessage tm = (TextMessage)cons.receive(1000);
+ TextMessage tm = (TextMessage) cons.receive(1000);
ProxyAssertSupport.assertNotNull(tm);
ProxyAssertSupport.assertEquals("a message", tm.getText());
@@ -519,7 +522,7 @@
cons = sess2.createConsumer(HornetQServerTestCase.queue1);
- tm = (TextMessage)cons.receive(1000);
+ tm = (TextMessage) cons.receive(1000);
ProxyAssertSupport.assertEquals("a message", tm.getText());
@@ -552,7 +555,7 @@
sess.commit();
- TextMessage mRec = (TextMessage)consumer.receive(1000);
+ TextMessage mRec = (TextMessage) consumer.receive(1000);
ProxyAssertSupport.assertNotNull(mRec);
log.trace("Got 1");
ProxyAssertSupport.assertNotNull(mRec);
@@ -566,7 +569,7 @@
sess.commit();
log.trace("Receiving 2");
- mRec = (TextMessage)consumer.receive(1000);
+ mRec = (TextMessage) consumer.receive(1000);
ProxyAssertSupport.assertNotNull(mRec);
log.trace("Received 2");
@@ -575,7 +578,7 @@
sess.rollback();
- TextMessage mRec2 = (TextMessage)consumer.receive(1000);
+ TextMessage mRec2 = (TextMessage) consumer.receive(1000);
ProxyAssertSupport.assertNotNull(mRec2);
ProxyAssertSupport.assertEquals("rollback", mRec2.getText());
@@ -679,6 +682,126 @@
}
+ public void _testSendCommitQueueCommitsInOrder() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = JMSTestCase.cf.createConnection();
+
+ Session producerSess = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(HornetQServerTestCase.queue1);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ Session consumerSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(HornetQServerTestCase.queue1);
+ CountDownLatch latch = new CountDownLatch(1);
+ conn.start();
+ myReceiver myReceiver = new myReceiver(latch, conn);
+ consumer.setMessageListener(myReceiver);
+ long lastBatchTime = System.currentTimeMillis();
+ int sentId = 0;
+ boolean started = false;
+ // Send some messages
+ while (true)
+ {
+ try
+ {
+ Message m = producerSess.createMessage();
+ m.setIntProperty("foo", sentId);
+ sentId++;
+ producer.send(m);
+
+ if (sentId == 1 || System.currentTimeMillis() - lastBatchTime > 50)
+ {
+ lastBatchTime = System.currentTimeMillis();
+ producerSess.commit();
+ }
+ }
+ catch (JMSException e)
+ {
+ //ignore connection closed by consumer
+ }
+
+ // wait for the first message to be received before we continue sending
+ if(!started)
+ {
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ started = true;
+ }
+ else
+ {
+ if(myReceiver.failed)
+ {
+ fail(myReceiver.e.getMessage());
+ }
+ }
+ }
+
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ removeAllMessages(HornetQServerTestCase.queue1.getQueueName(), true);
+ }
+
+ }
+
+ class myReceiver implements MessageListener
+ {
+ int count = 0;
+ boolean started = false;
+ private CountDownLatch startLatch;
+ boolean failed = false;
+ Exception e = null;
+
+ private Connection conn;
+
+ public myReceiver(CountDownLatch startLatch, Connection conn)
+ {
+ this.startLatch = startLatch;
+ this.conn = conn;
+ }
+
+ public void onMessage(Message message)
+ {
+ if(!started)
+ {
+ startLatch.countDown();
+ started = true;
+ }
+ try
+ {
+ int foo = message.getIntProperty("foo");
+ if(foo != count)
+ {
+ e = new Exception("received out of order expected " + count + " received " + foo);
+ failed = true;
+ conn.close();
+ }
+ count++;
+ }
+ catch (JMSException e)
+ {
+
+ this.e = e;
+ failed = true;
+ try
+ {
+ conn.close();
+ }
+ catch (JMSException e1)
+ {
+ e1.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ }
/**
* Test IllegateStateException is thrown if commit is called on a non-transacted session
*/
@@ -719,7 +842,6 @@
* Do not commit the receiving session.
* Close the connection
* Create a new connection, session and consumer - verify messages are redelivered
- *
*/
public void testAckNoCommitQueue() throws Exception
{
@@ -870,6 +992,7 @@
* Rollback the session.
* Verify messages aren't received by consumer.
*/
+
public void testSendRollbackQueue() throws Exception
{
Connection conn = null;
@@ -1033,6 +1156,7 @@
/*
* Send multiple messages in multiple contiguous sessions
*/
+
public void testSendMultipleQueue() throws Exception
{
Connection conn = null;
More information about the hornetq-commits
mailing list