[jboss-cvs] JBoss Messaging SVN: r4225 - trunk/tests/jms-tests/src/org/jboss/test/messaging/jms.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat May 17 05:40:56 EDT 2008
Author: ataylor
Date: 2008-05-17 05:40:56 -0400 (Sat, 17 May 2008)
New Revision: 4225
Modified:
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
readded acktest
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-05-17 09:01:18 UTC (rev 4224)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-05-17 09:40:56 UTC (rev 4225)
@@ -1,29 +1,1564 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.test.messaging.jms;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+
+import java.util.ArrayList;
+
/**
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * $Id: AcknowledgementTest.java 3173 2007-10-05 12:48:16Z timfox $
*/
-public class AcknowledgementTest
+public class AcknowledgementTest extends JMSTestCase
{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public AcknowledgementTest(String name)
+ {
+ super(name);
+ }
+
+ // TestCase overrides -------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /* Topics shouldn't hold on to messages if there are no subscribers */
+ public void testPersistentMessagesForTopicDropped() throws Exception
+ {
+ TopicConnection conn = null;
+
+ try
+ {
+ conn = cf.createTopicConnection();
+ TopicSession sess = conn.createTopicSession(true, 0);
+ TopicPublisher pub = sess.createPublisher(topic1);
+ pub.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ Message m = sess.createTextMessage("testing123");
+ pub.publish(m);
+ sess.commit();
+
+ conn.close();
+
+ checkEmpty(topic1);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /* Topics shouldn't hold on to messages when the non-durable subscribers close */
+ public void testPersistentMessagesForTopicDropped2() throws Exception
+ {
+ TopicConnection conn = null;
+
+ try
+ {
+ conn = cf.createTopicConnection();
+ conn.start();
+ TopicSession sess = conn.createTopicSession(true, 0);
+ TopicPublisher pub = sess.createPublisher(topic1);
+ TopicSubscriber sub = sess.createSubscriber(topic1);
+ pub.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ Message m = sess.createTextMessage("testing123");
+ pub.publish(m);
+ sess.commit();
+
+ //receive but rollback
+ TextMessage m2 = (TextMessage)sub.receive(3000);
+
+ assertNotNull(m2);
+ assertEquals("testing123", m2.getText());
+
+ sess.rollback();
+
+ conn.close();
+
+ checkEmpty(topic1);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testRollbackRecover() throws Exception
+ {
+ TopicConnection conn = null;
+
+ try
+ {
+ conn = cf.createTopicConnection();
+ TopicSession sess = conn.createTopicSession(true, 0);
+ TopicPublisher pub = sess.createPublisher(topic1);
+ TopicSubscriber cons = sess.createSubscriber(topic1);
+ conn.start();
+
+ Message m = sess.createTextMessage("testing123");
+ pub.publish(m);
+ sess.commit();
+
+ TextMessage m2 = (TextMessage)cons.receive(3000);
+ assertNotNull(m2);
+ assertEquals("testing123", m2.getText());
+
+ sess.rollback();
+
+ m2 = (TextMessage)cons.receive(3000);
+ assertNotNull(m2);
+ assertEquals("testing123", m2.getText());
+
+ conn.close();
+
+ conn = cf.createTopicConnection();
+ conn.start();
+
+ //test 2
+
+ TopicSession newsess = conn.createTopicSession(true, 0);
+ TopicPublisher newpub = newsess.createPublisher(topic1);
+ TopicSubscriber newcons = newsess.createSubscriber(topic1);
+
+ Message m3 = newsess.createTextMessage("testing456");
+ newpub.publish(m3);
+ newsess.commit();
+
+ TextMessage m4 = (TextMessage)newcons.receive(3000);
+ assertNotNull(m4);
+ assertEquals("testing456", m4.getText());
+
+ newsess.commit();
+
+ newpub.publish(m3);
+ newsess.commit();
+
+ TextMessage m5 = (TextMessage)newcons.receive(3000);
+ assertNotNull(m5);
+ assertEquals("testing456", m5.getText());
+
+ newsess.rollback();
+
+ TextMessage m6 = (TextMessage)newcons.receive(3000);
+ assertNotNull(m6);
+ assertEquals("testing456", m6.getText());
+
+ newsess.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testTransactionalAcknowledgement() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(0);
+
+ producerSess.rollback();
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+ assertRemainingMessages(0);
+
+ producerSess.commit();
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Sent messages");
+
+ int count = 0;
+ while (true)
+ {
+ Message m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Received " + count + " messages");
+
+ assertEquals(count, NUM_MESSAGES);
+
+ consumerSess.rollback();
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Session rollback called");
+
+ int i = 0;
+ for(; i < NUM_MESSAGES; i++)
+ {
+ consumer.receive();
+ log.trace("Received message " + i);
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ // if I don't receive enough messages, the test will timeout
+
+ log.trace("Received " + i + " messages after recover");
+
+ consumerSess.commit();
+
+ assertRemainingMessages(0);
+
+ checkEmpty(queue1);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /**
+ * Send some messages, don't acknowledge them and verify that they are re-sent on recovery.
+ */
+ public void testClientAcknowledgeNoAcknowledgement() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Sent messages");
+
+ int count = 0;
+ while (true)
+ {
+ Message m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Received " + count + " messages");
+
+ assertEquals(count, NUM_MESSAGES);
+
+ consumerSess.recover();
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Session recover called");
+
+ Message m = null;
+
+ int i = 0;
+ for(; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive();
+ log.trace("Received message " + i);
+
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ // if I don't receive enough messages, the test will timeout
+
+ log.trace("Received " + i + " messages after recover");
+
+ m.acknowledge();
+
+ assertRemainingMessages(0);
+
+ // make sure I don't receive anything else
+
+ checkEmpty(queue1);
+
+ conn.close();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+
+ /**
+ * Send some messages, acknowledge them individually and verify they are not resent after
+ * recovery.
+ */
+ public void testIndividualClientAcknowledge() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = consumer.receive(200);
+
+ assertNotNull(m);
+
+ assertRemainingMessages(NUM_MESSAGES - i);
+
+ m.acknowledge();
+
+ assertRemainingMessages(NUM_MESSAGES - (i + 1));
+ }
+
+ assertRemainingMessages(0);
+
+ consumerSess.recover();
+
+ Message m = consumer.receive(200);
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+
+ /**
+ * Send some messages, acknowledge them once after all have been received verify they are not
+ * resent after recovery
+ */
+ public void testBulkClientAcknowledge() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Sent messages");
+
+ Message m = null;
+ int count = 0;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ assertNotNull(m);
+
+ m.acknowledge();
+
+ assertRemainingMessages(0);
+
+ log.trace("Received " + count + " messages");
+
+ assertEquals(count, NUM_MESSAGES);
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ m = consumer.receive(200);
+
+ log.trace("Message is:" + m);
+
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+ /**
+ * Send some messages, acknowledge some of them, and verify that the others are resent after
+ * delivery
+ */
+ public void testPartialClientAcknowledge() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+ final int ACKED_MESSAGES = 11;
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Sent messages");
+
+ int count = 0;
+
+ Message m = null;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive(200);
+ if (m == null)
+ {
+ break;
+ }
+ if (count == ACKED_MESSAGES -1)
+ {
+ m.acknowledge();
+ }
+ count++;
+ }
+
+ assertRemainingMessages(NUM_MESSAGES - ACKED_MESSAGES);
+
+ assertNotNull(m);
+
+ log.trace("Received " + count + " messages");
+
+ assertEquals(count, NUM_MESSAGES);
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ count = 0;
+ while (true)
+ {
+ m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
+
+ assertEquals(NUM_MESSAGES - ACKED_MESSAGES, count);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ removeAllMessages(queue1.getQueueName(), true, 0);
+ }
+ }
+
+
+
+ /*
+ * Send some messages, consume them and verify the messages are not sent upon recovery
+ *
+ */
+ public void testAutoAcknowledge() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Sent messages");
+
+ int count = 0;
+
+ Message m = null;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertRemainingMessages(NUM_MESSAGES - i);
+
+ m = consumer.receive(200);
+
+ assertRemainingMessages(NUM_MESSAGES - (i + 1));
+
+ if (m == null) break;
+ count++;
+ }
+
+ assertRemainingMessages(0);
+
+ assertNotNull(m);
+
+ log.trace("Received " + count + " messages");
+
+ assertEquals(count, NUM_MESSAGES);
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ m = consumer.receive(200);
+
+ log.trace("Message is:" + m);
+
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+
+ public void testDupsOKAcknowledgeQueue() throws Exception
+ {
+ final int BATCH_SIZE = 10;
+
+ ArrayList<String> bindings = new ArrayList<String>();
+ bindings.add("mycf");
+ deployConnectionFactory(null, "MyConnectionFactory2", bindings, -1, -1, -1, -1, false, false, false, BATCH_SIZE);
+
+ Connection conn = null;
+
+ try
+ {
+
+ ConnectionFactory myCF = (ConnectionFactory)ic.lookup("/mycf");
+
+ conn = myCF.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ //Send some messages
+ for (int i = 0; i < 19; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(19);
+
+ log.trace("Sent messages");
+
+ Message m = null;
+ for (int i = 0; i < 10; i++)
+ {
+ m = consumer.receive(200);
+
+ assertNotNull(m);
+
+ if (i == 9)
+ {
+ assertRemainingMessages(9);
+ }
+ else
+ {
+ assertRemainingMessages(19);
+ }
+ }
+
+ for (int i = 0; i < 9; i++)
+ {
+ m = consumer.receive(200);
+
+ assertNotNull(m);
+
+ assertRemainingMessages(9);
+ }
+
+ //Make sure the last are acked on close
+
+ consumerSess.close();
+
+ assertRemainingMessages(0);
+ }
+ finally
+ {
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ undeployConnectionFactory("MyConnectionFactory2");
+ }
+
+
+ }
+
+
+ public void testDupsOKAcknowledgeTopic() throws Exception
+ {
+ final int BATCH_SIZE = 10;
+
+ ArrayList<String> bindings = new ArrayList<String>();
+ bindings.add("mycf");
+ deployConnectionFactory(null, "MyConnectionFactory2", bindings, -1, -1, -1, -1, false, false, false, BATCH_SIZE);
+ Connection conn = null;
+
+ try
+ {
+
+ ConnectionFactory myCF = (ConnectionFactory)ic.lookup("/mycf");
+
+ conn = myCF.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(topic1);
+
+ Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(topic1);
+ conn.start();
+
+ //Send some messages
+ for (int i = 0; i < 19; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ log.trace("Sent messages");
+
+ Message m = null;
+ for (int i = 0; i < 19; i++)
+ {
+ m = consumer.receive(200);
+
+ assertNotNull(m);
+ }
+
+ consumerSess.close();
+ }
+ finally
+ {
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ undeployConnectionFactory("MyConnectionFactory2");
+ }
+
+
+ }
+
+
+
+ /*
+ * Send some messages, consume them and verify the messages are not sent upon recovery
+ *
+ */
+ public void testLazyAcknowledge() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+
+ conn = cf.createConnection();
+
+ Session producerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
+
+ Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
+
+ final int NUM_MESSAGES = 20;
+
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Sent messages");
+
+ int count = 0;
+
+ Message m = null;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
+
+ assertNotNull(m);
+
+ assertRemainingMessages(NUM_MESSAGES);
+
+ log.trace("Received " + count + " messages");
+
+ assertEquals(count, NUM_MESSAGES);
+
+ consumerSess.recover();
+
+ log.trace("Session recover called");
+
+ m = consumer.receive(200);
+
+ log.trace("Message is:" + m);
+
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+
+ public void testMessageListenerAutoAck() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
+
+ log.trace("Sending messages");
+
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+
+ log.trace("Sent messages");
+
+ sessSend.close();
+
+ assertRemainingMessages(3);
+
+ conn.start();
+
+ Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.trace("Creating consumer");
+
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+
+ log.trace("Created consumer");
+
+ MessageListenerAutoAck listener = new MessageListenerAutoAck(sessReceive);
+
+ log.trace("Setting message listener");
+
+ cons.setMessageListener(listener);
+
+ log.trace("Set message listener");
+
+ listener.waitForMessages();
+
+ Thread.sleep(500);
+
+ assertRemainingMessages(0);
+
+ assertFalse(listener.failed);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /*
+ * This test will:
+ * - Send two messages over a producer
+ * - Receive one message over a consumer
+ * - Call Recover
+ * - Receive the second message
+ * - The queue should be empty after that
+ * Note: testMessageListenerAutoAck will test a similar case using MessageListeners
+ */
+ public void testRecoverAutoACK() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+ Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = s.createProducer(queue1);
+ p.setDeliveryMode(DeliveryMode.PERSISTENT);
+ Message m = s.createTextMessage("one");
+ p.send(m);
+ m = s.createTextMessage("two");
+ p.send(m);
+ conn.close();
+
+ conn = null;
+
+ assertRemainingMessages(2);
+
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = session.createConsumer(queue1);
+
+ TextMessage messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(messageReceived);
+
+ assertEquals("one", messageReceived.getText());
+
+ session.recover();
+
+ messageReceived = (TextMessage)consumer.receive(1000);
+
+ assertEquals("two", messageReceived.getText());
+
+ consumer.close();
+
+ // I can't call xasession.close for this test as JCA layer would cache the session
+ // So.. keep this close commented!
+ //xasession.close();
+
+ assertRemainingMessages(0);
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ destroyQueue("MyQueue2");
+ }
+ }
+
+
+
+ public void testMessageListenerDupsOK() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
+
+ log.trace("Sending messages");
+
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+
+ log.trace("Sent messages");
+
+ sessSend.close();
+
+ assertRemainingMessages(3);
+
+ conn.start();
+
+ Session sessReceive = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+
+ log.trace("Creating consumer");
+
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+
+ log.trace("Created consumer");
+
+ MessageListenerDupsOK listener = new MessageListenerDupsOK(sessReceive);
+
+ log.trace("Setting message listener");
+
+ cons.setMessageListener(listener);
+
+ log.trace("Set message listener");
+
+ listener.waitForMessages();
+
+ assertRemainingMessages(3);
+
+ conn.close();
+
+ Thread.sleep(500);
+
+ assertRemainingMessages(0);
+ assertFalse(listener.failed);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testMessageListenerClientAck() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
+
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ sessSend.close();
+
+ assertRemainingMessages(3);
+
+ conn.start();
+ Session sessReceive = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+ MessageListenerClientAck listener = new MessageListenerClientAck(sessReceive);
+ cons.setMessageListener(listener);
+
+ listener.waitForMessages();
+
+ Thread.sleep(500);
+
+ assertRemainingMessages(0);
+
+ conn.close();
+
+ assertFalse(listener.failed);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+ public void testMessageListenerTransactionalAck() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
+
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ sessSend.close();
+
+ assertRemainingMessages(3);
+
+ conn.start();
+ Session sessReceive = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+ MessageListenerTransactionalAck listener = new MessageListenerTransactionalAck(sessReceive);
+ cons.setMessageListener(listener);
+ listener.waitForMessages();
+
+ Thread.sleep(500);
+
+ assertRemainingMessages(0);
+
+ conn.close();
+
+ assertFalse(listener.failed);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+
+ private abstract class LatchListener implements MessageListener
+ {
+ protected Latch latch = new Latch();
+
+ protected Session sess;
+
+ protected int count = 0;
+
+ boolean failed;
+
+ LatchListener(Session sess)
+ {
+ this.sess = sess;
+ }
+
+ public void waitForMessages() throws InterruptedException
+ {
+ assertTrue("failed to receive all messages",latch.attempt(2000));
+ }
+
+ public abstract void onMessage(Message m);
+
+ }
+
+ private class MessageListenerAutoAck extends LatchListener
+ {
+
+ MessageListenerAutoAck(Session sess)
+ {
+ super(sess);
+ }
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ count++;
+
+ TextMessage tm = (TextMessage)m;
+
+ log.info("Got message: " + tm.getText());
+
+ // Receive first three messages then recover() session
+ // Only last message should be redelivered
+ if (count == 1)
+ {
+ assertRemainingMessages(3);
+
+ if (!"a".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 2)
+ {
+ assertRemainingMessages(2);
+
+ if (!"b".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 3)
+ {
+ assertRemainingMessages(1);
+
+ if (!"c".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ sess.recover();
+ }
+ if (count == 4)
+ {
+ assertRemainingMessages(1);
+
+ if (!"c".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ latch.release();
+ }
+
+ }
+ catch (Exception e)
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+
+ }
+
+ private class MessageListenerDupsOK extends LatchListener
+ {
+
+ MessageListenerDupsOK(Session sess)
+ {
+ super(sess);
+ }
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ count++;
+
+ TextMessage tm = (TextMessage)m;
+
+ // Receive first three messages then recover() session
+ // Only last message should be redelivered
+ if (count == 1)
+ {
+ assertRemainingMessages(3);
+
+ if (!"a".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 2)
+ {
+ assertRemainingMessages(3);
+
+ if (!"b".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 3)
+ {
+ assertRemainingMessages(3);
+
+ if (!"c".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ sess.recover();
+ }
+ if (count == 4)
+ {
+ assertRemainingMessages(3);
+
+ if (!"c".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ latch.release();
+ }
+
+ }
+ catch (Exception e)
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+
+ }
+
+
+ private class MessageListenerClientAck extends LatchListener
+ {
+
+ MessageListenerClientAck(Session sess)
+ {
+ super(sess);
+ }
+
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ count++;
+
+ TextMessage tm = (TextMessage)m;
+
+ if (count == 1)
+ {
+ assertRemainingMessages(3);
+ if (!"a".equals(tm.getText()))
+ {
+ log.trace("Expected a but got " + tm.getText());
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 2)
+ {
+ assertRemainingMessages(3);
+ if (!"b".equals(tm.getText()))
+ {
+ log.trace("Expected b but got " + tm.getText());
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 3)
+ {
+ assertRemainingMessages(3);
+ if (!"c".equals(tm.getText()))
+ {
+ log.trace("Expected c but got " + tm.getText());
+ failed = true;
+ latch.release();
+ }
+ sess.recover();
+ }
+ if (count == 4)
+ {
+ assertRemainingMessages(3);
+ if (!"a".equals(tm.getText()))
+ {
+ log.trace("Expected a but got " + tm.getText());
+ failed = true;
+ latch.release();
+ }
+ tm.acknowledge();
+ assertRemainingMessages(2);
+ sess.recover();
+ }
+ if (count == 5)
+ {
+ assertRemainingMessages(2);
+ if (!"b".equals(tm.getText()))
+ {
+ log.trace("Expected b but got " + tm.getText());
+ failed = true;
+ latch.release();
+ }
+ sess.recover();
+ }
+ if (count == 6)
+ {
+ assertRemainingMessages(2);
+ if (!"b".equals(tm.getText()))
+ {
+ log.trace("Expected b but got " + tm.getText());
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 7)
+ {
+ assertRemainingMessages(2);
+ if (!"c".equals(tm.getText()))
+ {
+ log.trace("Expected c but got " + tm.getText());
+ failed = true;
+ latch.release();
+ }
+ tm.acknowledge();
+ assertRemainingMessages(0);
+ latch.release();
+ }
+
+ }
+ catch (Exception e)
+ {
+ log.error("Caught exception", e);
+ failed = true;
+ latch.release();
+ }
+ }
+
+ }
+
+ private class MessageListenerTransactionalAck extends LatchListener
+ {
+
+ MessageListenerTransactionalAck(Session sess)
+ {
+ super(sess);
+ }
+
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ count++;
+
+ TextMessage tm = (TextMessage)m;
+
+ if (count == 1)
+ {
+ assertRemainingMessages(3);
+ if (!"a".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 2)
+ {
+ assertRemainingMessages(3);
+ if (!"b".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 3)
+ {
+ assertRemainingMessages(3);
+ if (!"c".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ log.trace("Rollback");
+ sess.rollback();
+ }
+ if (count == 4)
+ {
+ assertRemainingMessages(3);
+ if (!"a".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ }
+ if (count == 5)
+ {
+ assertRemainingMessages(3);
+ if (!"b".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ log.trace("commit");
+ sess.commit();
+ assertRemainingMessages(1);
+ }
+ if (count == 6)
+ {
+ assertRemainingMessages(1);
+ if (!"c".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ log.trace("recover");
+ sess.rollback();
+ }
+ if (count == 7)
+ {
+ assertRemainingMessages(1);
+ if (!"c".equals(tm.getText()))
+ {
+ failed = true;
+ latch.release();
+ }
+ log.trace("Commit");
+ sess.commit();
+ assertRemainingMessages(0);
+ latch.release();
+ }
+ }
+ catch (Exception e)
+ {
+ //log.error(e);
+ failed = true;
+ latch.release();
+ }
+ }
+
+ }
}
+
+
More information about the jboss-cvs-commits
mailing list