[jboss-cvs] JBoss Messaging SVN: r4265 - trunk/tests/jms-tests/src/org/jboss/test/messaging/jms.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 21 09:24:03 EDT 2008
Author: ataylor
Date: 2008-05-21 09:24:03 -0400 (Wed, 21 May 2008)
New Revision: 4265
Modified:
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
Log:
test fix for blocking sync
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-05-21 13:06:37 UTC (rev 4264)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2008-05-21 13:24:03 UTC (rev 4265)
@@ -21,28 +21,16 @@
*/
package org.jboss.test.messaging.jms;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.TopicConnection;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
import EDU.oswego.cs.dl.util.concurrent.Latch;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import javax.jms.*;
import java.util.ArrayList;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * $Id: AcknowledgementTest.java 3173 2007-10-05 12:48:16Z timfox $
+ * <p/>
+ * $Id: AcknowledgementTest.java 3173 2007-10-05 12:48:16Z timfox $
*/
public class AcknowledgementTest extends JMSTestCase
{
@@ -61,9 +49,24 @@
// TestCase overrides -------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ getJmsServerManager().createConnectionFactory("ackconnectionfactory", null, 1000, true, 1024 * 1024, -1, 1000, -1, true, "/ackconnectionfactory");
+ cf = (JBossConnectionFactory) getInitialContext().lookup("/ackconnectionfactory");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ getJmsServerManager().destroyConnectionFactory("ackconnectionfactory");
+ cf = null;
+ }
+
// Public --------------------------------------------------------
/* Topics shouldn't hold on to messages if there are no subscribers */
+
public void testPersistentMessagesForTopicDropped() throws Exception
{
TopicConnection conn = null;
@@ -111,7 +114,7 @@
sess.commit();
//receive but rollback
- TextMessage m2 = (TextMessage)sub.receive(3000);
+ TextMessage m2 = (TextMessage) sub.receive(3000);
assertNotNull(m2);
assertEquals("testing123", m2.getText());
@@ -147,13 +150,13 @@
pub.publish(m);
sess.commit();
- TextMessage m2 = (TextMessage)cons.receive(3000);
+ TextMessage m2 = (TextMessage) cons.receive(3000);
assertNotNull(m2);
assertEquals("testing123", m2.getText());
sess.rollback();
- m2 = (TextMessage)cons.receive(3000);
+ m2 = (TextMessage) cons.receive(3000);
assertNotNull(m2);
assertEquals("testing123", m2.getText());
@@ -172,7 +175,7 @@
newpub.publish(m3);
newsess.commit();
- TextMessage m4 = (TextMessage)newcons.receive(3000);
+ TextMessage m4 = (TextMessage) newcons.receive(3000);
assertNotNull(m4);
assertEquals("testing456", m4.getText());
@@ -181,13 +184,13 @@
newpub.publish(m3);
newsess.commit();
- TextMessage m5 = (TextMessage)newcons.receive(3000);
+ TextMessage m5 = (TextMessage) newcons.receive(3000);
assertNotNull(m5);
assertEquals("testing456", m5.getText());
newsess.rollback();
- TextMessage m6 = (TextMessage)newcons.receive(3000);
+ TextMessage m6 = (TextMessage) newcons.receive(3000);
assertNotNull(m6);
assertEquals("testing456", m6.getText());
@@ -209,470 +212,468 @@
try
{
- conn = cf.createConnection();
+ conn = cf.createConnection();
- Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- final int NUM_MESSAGES = 20;
+ final int NUM_MESSAGES = 20;
- //Send some messages
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- producerSess.rollback();
+ producerSess.rollback();
- //Send some messages
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
- assertRemainingMessages(0);
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
+ assertRemainingMessages(0);
- producerSess.commit();
+ producerSess.commit();
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- int count = 0;
- while (true)
- {
- Message m = consumer.receive(200);
- if (m == null) break;
- count++;
- }
+ int count = 0;
+ while (true)
+ {
+ Message m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Received " + count + " messages");
+ log.trace("Received " + count + " messages");
- assertEquals(count, NUM_MESSAGES);
+ assertEquals(count, NUM_MESSAGES);
- consumerSess.rollback();
+ consumerSess.rollback();
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Session rollback called");
+ log.trace("Session rollback called");
- int i = 0;
- for(; i < NUM_MESSAGES; i++)
- {
- consumer.receive();
- log.trace("Received message " + i);
- }
+ int i = 0;
+ for (; i < NUM_MESSAGES; i++)
+ {
+ consumer.receive();
+ log.trace("Received message " + i);
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- // if I don't receive enough messages, the test will timeout
+ // if I don't receive enough messages, the test will timeout
- log.trace("Received " + i + " messages after recover");
+ log.trace("Received " + i + " messages after recover");
- consumerSess.commit();
+ consumerSess.commit();
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- checkEmpty(queue1);
+ checkEmpty(queue1);
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
}
}
- /**
- * Send some messages, don't acknowledge them and verify that they are re-sent on recovery.
- */
- public void testClientAcknowledgeNoAcknowledgement() throws Exception
+ /**
+ * Send some messages, don't acknowledge them and verify that they are re-sent on recovery.
+ */
+ public void testClientAcknowledgeNoAcknowledgement() throws Exception
{
- Connection conn = null;
+ Connection conn = null;
- try
- {
- conn = cf.createConnection();
+ try
+ {
+ conn = cf.createConnection();
- Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- final int NUM_MESSAGES = 20;
+ final int NUM_MESSAGES = 20;
- //Send some messages
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- int count = 0;
- while (true)
- {
- Message m = consumer.receive(200);
- if (m == null) break;
- count++;
- }
+ int count = 0;
+ while (true)
+ {
+ Message m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Received " + count + " messages");
+ log.trace("Received " + count + " messages");
- assertEquals(count, NUM_MESSAGES);
+ assertEquals(count, NUM_MESSAGES);
- consumerSess.recover();
+ consumerSess.recover();
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Session recover called");
+ log.trace("Session recover called");
- Message m = null;
+ Message m = null;
- int i = 0;
- for(; i < NUM_MESSAGES; i++)
- {
- m = consumer.receive();
- log.trace("Received message " + i);
+ int i = 0;
+ for (; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive();
+ log.trace("Received message " + i);
- }
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- // if I don't receive enough messages, the test will timeout
+ // if I don't receive enough messages, the test will timeout
- log.trace("Received " + i + " messages after recover");
+ log.trace("Received " + i + " messages after recover");
- m.acknowledge();
+ m.acknowledge();
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- // make sure I don't receive anything else
+ // make sure I don't receive anything else
- checkEmpty(queue1);
+ checkEmpty(queue1);
- conn.close();
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
+ conn.close();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
-
- /**
- * Send some messages, acknowledge them individually and verify they are not resent after
+ /**
+ * Send some messages, acknowledge them individually and verify they are not resent after
* recovery.
- */
- public void testIndividualClientAcknowledge() throws Exception
+ */
+ public void testIndividualClientAcknowledge() throws Exception
{
- Connection conn = null;
+ Connection conn = null;
- try
- {
- conn = cf.createConnection();
+ try
+ {
+ conn = cf.createConnection();
- Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- final int NUM_MESSAGES = 20;
+ final int NUM_MESSAGES = 20;
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = consumer.receive(200);
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = consumer.receive(200);
- assertNotNull(m);
+ assertNotNull(m);
- assertRemainingMessages(NUM_MESSAGES - i);
+ assertRemainingMessages(NUM_MESSAGES - i);
- m.acknowledge();
+ m.acknowledge();
- assertRemainingMessages(NUM_MESSAGES - (i + 1));
- }
+ assertRemainingMessages(NUM_MESSAGES - (i + 1));
+ }
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- consumerSess.recover();
+ consumerSess.recover();
- Message m = consumer.receive(200);
- assertNull(m);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
+ Message m = consumer.receive(200);
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
- /**
- * Send some messages, acknowledge them once after all have been received verify they are not
+ /**
+ * Send some messages, acknowledge them once after all have been received verify they are not
* resent after recovery
- */
- public void testBulkClientAcknowledge() throws Exception
+ */
+ public void testBulkClientAcknowledge() throws Exception
{
- Connection conn = null;
+ Connection conn = null;
- try
- {
- conn = cf.createConnection();
+ try
+ {
+ conn = cf.createConnection();
- Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- final int NUM_MESSAGES = 20;
+ final int NUM_MESSAGES = 20;
- //Send some messages
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- Message m = null;
- int count = 0;
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- m = consumer.receive(200);
- if (m == null) break;
- count++;
- }
+ Message m = null;
+ int count = 0;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- assertNotNull(m);
+ assertNotNull(m);
- m.acknowledge();
+ m.acknowledge();
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- log.trace("Received " + count + " messages");
+ log.trace("Received " + count + " messages");
- assertEquals(count, NUM_MESSAGES);
+ assertEquals(count, NUM_MESSAGES);
- consumerSess.recover();
+ consumerSess.recover();
- log.trace("Session recover called");
+ log.trace("Session recover called");
- m = consumer.receive(200);
+ m = consumer.receive(200);
- log.trace("Message is:" + m);
+ log.trace("Message is:" + m);
- assertNull(m);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
- /**
- * Send some messages, acknowledge some of them, and verify that the others are resent after
+ /**
+ * Send some messages, acknowledge some of them, and verify that the others are resent after
* delivery
- */
- public void testPartialClientAcknowledge() throws Exception
+ */
+ public void testPartialClientAcknowledge() throws Exception
{
- Connection conn = null;
+ Connection conn = null;
- try
- {
+ try
+ {
- conn = cf.createConnection();
+ conn = cf.createConnection();
- Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- final int NUM_MESSAGES = 20;
- final int ACKED_MESSAGES = 11;
+ final int NUM_MESSAGES = 20;
+ final int ACKED_MESSAGES = 11;
- //Send some messages
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- int count = 0;
+ int count = 0;
- Message m = null;
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- m = consumer.receive(200);
- if (m == null)
- {
- break;
- }
- if (count == ACKED_MESSAGES -1)
- {
- m.acknowledge();
- }
- count++;
- }
+ Message m = null;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive(200);
+ if (m == null)
+ {
+ break;
+ }
+ if (count == ACKED_MESSAGES - 1)
+ {
+ m.acknowledge();
+ }
+ count++;
+ }
- assertRemainingMessages(NUM_MESSAGES - ACKED_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES - ACKED_MESSAGES);
- assertNotNull(m);
+ assertNotNull(m);
- log.trace("Received " + count + " messages");
+ log.trace("Received " + count + " messages");
- assertEquals(count, NUM_MESSAGES);
+ assertEquals(count, NUM_MESSAGES);
- consumerSess.recover();
+ consumerSess.recover();
- log.trace("Session recover called");
+ log.trace("Session recover called");
- count = 0;
- while (true)
- {
- m = consumer.receive(200);
- if (m == null) break;
- count++;
- }
+ count = 0;
+ while (true)
+ {
+ m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
- assertEquals(NUM_MESSAGES - ACKED_MESSAGES, count);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
+ assertEquals(NUM_MESSAGES - ACKED_MESSAGES, count);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
- removeAllMessages(queue1.getQueueName(), true, 0);
- }
+ removeAllMessages(queue1.getQueueName(), true, 0);
+ }
}
-
- /*
- * Send some messages, consume them and verify the messages are not sent upon recovery
- *
- */
- public void testAutoAcknowledge() throws Exception
+ /*
+ * Send some messages, consume them and verify the messages are not sent upon recovery
+ *
+ */
+ public void testAutoAcknowledge() throws Exception
{
- Connection conn = null;
+ Connection conn = null;
- try
- {
+ try
+ {
- conn = cf.createConnection();
+ conn = cf.createConnection();
- Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- final int NUM_MESSAGES = 20;
+ final int NUM_MESSAGES = 20;
- //Send some messages
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- int count = 0;
+ int count = 0;
- Message m = null;
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- assertRemainingMessages(NUM_MESSAGES - i);
+ Message m = null;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ assertRemainingMessages(NUM_MESSAGES - i);
- m = consumer.receive(200);
+ m = consumer.receive(200);
- assertRemainingMessages(NUM_MESSAGES - (i + 1));
+ assertRemainingMessages(NUM_MESSAGES - (i + 1));
- if (m == null) break;
- count++;
- }
+ if (m == null) break;
+ count++;
+ }
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- assertNotNull(m);
+ assertNotNull(m);
- log.trace("Received " + count + " messages");
+ log.trace("Received " + count + " messages");
- assertEquals(count, NUM_MESSAGES);
+ assertEquals(count, NUM_MESSAGES);
- consumerSess.recover();
+ consumerSess.recover();
- log.trace("Session recover called");
+ log.trace("Session recover called");
- m = consumer.receive(200);
+ m = consumer.receive(200);
- log.trace("Message is:" + m);
+ log.trace("Message is:" + m);
- assertNull(m);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
@@ -690,69 +691,69 @@
try
{
- ConnectionFactory myCF = (ConnectionFactory)ic.lookup("/mycf");
+ ConnectionFactory myCF = (ConnectionFactory) ic.lookup("/mycf");
- conn = myCF.createConnection();
+ conn = myCF.createConnection();
- Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- //Send some messages
- for (int i = 0; i < 19; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < 19; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(19);
+ assertRemainingMessages(19);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- Message m = null;
- for (int i = 0; i < 10; i++)
- {
- m = consumer.receive(200);
+ Message m = null;
+ for (int i = 0; i < 10; i++)
+ {
+ m = consumer.receive(200);
- assertNotNull(m);
+ assertNotNull(m);
- if (i == 9)
- {
- assertRemainingMessages(9);
- }
- else
- {
- assertRemainingMessages(19);
- }
- }
+ if (i == 9)
+ {
+ assertRemainingMessages(9);
+ }
+ else
+ {
+ assertRemainingMessages(19);
+ }
+ }
- for (int i = 0; i < 9; i++)
- {
- m = consumer.receive(200);
+ for (int i = 0; i < 9; i++)
+ {
+ m = consumer.receive(200);
- assertNotNull(m);
+ assertNotNull(m);
- assertRemainingMessages(9);
- }
+ assertRemainingMessages(9);
+ }
- //Make sure the last are acked on close
+ //Make sure the last are acked on close
- consumerSess.close();
+ consumerSess.close();
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
- undeployConnectionFactory("MyConnectionFactory2");
+ undeployConnectionFactory("MyConnectionFactory2");
}
@@ -771,120 +772,119 @@
try
{
- ConnectionFactory myCF = (ConnectionFactory)ic.lookup("/mycf");
+ ConnectionFactory myCF = (ConnectionFactory) ic.lookup("/mycf");
- conn = myCF.createConnection();
+ conn = myCF.createConnection();
- Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(topic1);
+ Session producerSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(topic1);
- Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(topic1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(topic1);
+ conn.start();
- //Send some messages
- for (int i = 0; i < 19; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < 19; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- log.trace("Sent messages");
+ log.trace("Sent messages");
- Message m = null;
- for (int i = 0; i < 19; i++)
- {
- m = consumer.receive(200);
+ Message m = null;
+ for (int i = 0; i < 19; i++)
+ {
+ m = consumer.receive(200);
- assertNotNull(m);
- }
+ assertNotNull(m);
+ }
- consumerSess.close();
+ consumerSess.close();
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
- undeployConnectionFactory("MyConnectionFactory2");
+ undeployConnectionFactory("MyConnectionFactory2");
}
}
-
- /*
- * Send some messages, consume them and verify the messages are not sent upon recovery
- *
- */
- public void testLazyAcknowledge() throws Exception
+ /*
+ * Send some messages, consume them and verify the messages are not sent upon recovery
+ *
+ */
+ public void testLazyAcknowledge() throws Exception
{
- Connection conn = null;
+ Connection conn = null;
- try
- {
+ try
+ {
- conn = cf.createConnection();
+ conn = cf.createConnection();
- Session producerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- MessageProducer producer = producerSess.createProducer(queue1);
+ Session producerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageProducer producer = producerSess.createProducer(queue1);
- Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- MessageConsumer consumer = consumerSess.createConsumer(queue1);
- conn.start();
+ Session consumerSess = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSess.createConsumer(queue1);
+ conn.start();
- final int NUM_MESSAGES = 20;
+ final int NUM_MESSAGES = 20;
- //Send some messages
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- Message m = producerSess.createMessage();
- producer.send(m);
- }
+ //Send some messages
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ Message m = producerSess.createMessage();
+ producer.send(m);
+ }
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- int count = 0;
+ int count = 0;
- Message m = null;
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- m = consumer.receive(200);
- if (m == null) break;
- count++;
- }
+ Message m = null;
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ m = consumer.receive(200);
+ if (m == null) break;
+ count++;
+ }
- assertNotNull(m);
+ assertNotNull(m);
- assertRemainingMessages(NUM_MESSAGES);
+ assertRemainingMessages(NUM_MESSAGES);
- log.trace("Received " + count + " messages");
+ log.trace("Received " + count + " messages");
- assertEquals(count, NUM_MESSAGES);
+ assertEquals(count, NUM_MESSAGES);
- consumerSess.recover();
+ consumerSess.recover();
- log.trace("Session recover called");
+ log.trace("Session recover called");
- m = consumer.receive(200);
+ m = consumer.receive(200);
- log.trace("Message is:" + m);
+ log.trace("Message is:" + m);
- assertNull(m);
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
@@ -894,57 +894,57 @@
try
{
- conn = cf.createConnection();
- Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessSend.createProducer(queue1);
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
- log.trace("Sending messages");
+ log.trace("Sending messages");
- TextMessage tm1 = sessSend.createTextMessage("a");
- TextMessage tm2 = sessSend.createTextMessage("b");
- TextMessage tm3 = sessSend.createTextMessage("c");
- prod.send(tm1);
- prod.send(tm2);
- prod.send(tm3);
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- sessSend.close();
+ sessSend.close();
- assertRemainingMessages(3);
+ assertRemainingMessages(3);
- conn.start();
+ conn.start();
- Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- log.trace("Creating consumer");
+ log.trace("Creating consumer");
- MessageConsumer cons = sessReceive.createConsumer(queue1);
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
- log.trace("Created consumer");
+ log.trace("Created consumer");
- MessageListenerAutoAck listener = new MessageListenerAutoAck(sessReceive);
+ MessageListenerAutoAck listener = new MessageListenerAutoAck(sessReceive);
- log.trace("Setting message listener");
+ log.trace("Setting message listener");
- cons.setMessageListener(listener);
+ cons.setMessageListener(listener);
- log.trace("Set message listener");
+ log.trace("Set message listener");
- listener.waitForMessages();
+ listener.waitForMessages();
- Thread.sleep(500);
+ Thread.sleep(500);
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- assertFalse(listener.failed);
+ assertFalse(listener.failed);
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
}
}
@@ -985,7 +985,7 @@
MessageConsumer consumer = session.createConsumer(queue1);
- TextMessage messageReceived = (TextMessage)consumer.receive(1000);
+ TextMessage messageReceived = (TextMessage) consumer.receive(1000);
assertNotNull(messageReceived);
@@ -993,7 +993,7 @@
session.recover();
- messageReceived = (TextMessage)consumer.receive(1000);
+ messageReceived = (TextMessage) consumer.receive(1000);
assertEquals("two", messageReceived.getText());
@@ -1018,7 +1018,6 @@
}
-
public void testMessageListenerDupsOK() throws Exception
{
Connection conn = null;
@@ -1026,60 +1025,60 @@
try
{
- conn = cf.createConnection();
- Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessSend.createProducer(queue1);
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
- log.trace("Sending messages");
+ log.trace("Sending messages");
- TextMessage tm1 = sessSend.createTextMessage("a");
- TextMessage tm2 = sessSend.createTextMessage("b");
- TextMessage tm3 = sessSend.createTextMessage("c");
- prod.send(tm1);
- prod.send(tm2);
- prod.send(tm3);
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
- log.trace("Sent messages");
+ log.trace("Sent messages");
- sessSend.close();
+ sessSend.close();
- assertRemainingMessages(3);
+ assertRemainingMessages(3);
- conn.start();
+ conn.start();
- Session sessReceive = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
+ Session sessReceive = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- log.trace("Creating consumer");
+ log.trace("Creating consumer");
- MessageConsumer cons = sessReceive.createConsumer(queue1);
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
- log.trace("Created consumer");
+ log.trace("Created consumer");
- MessageListenerDupsOK listener = new MessageListenerDupsOK(sessReceive);
+ MessageListenerDupsOK listener = new MessageListenerDupsOK(sessReceive);
- log.trace("Setting message listener");
+ log.trace("Setting message listener");
- cons.setMessageListener(listener);
+ cons.setMessageListener(listener);
- log.trace("Set message listener");
+ log.trace("Set message listener");
- listener.waitForMessages();
+ listener.waitForMessages();
- assertRemainingMessages(3);
+ assertRemainingMessages(3);
- conn.close();
+ conn.close();
- Thread.sleep(500);
+ Thread.sleep(500);
- assertRemainingMessages(0);
- assertFalse(listener.failed);
+ assertRemainingMessages(0);
+ assertFalse(listener.failed);
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
}
}
@@ -1089,42 +1088,42 @@
try
{
- conn = cf.createConnection();
- Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessSend.createProducer(queue1);
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
- TextMessage tm1 = sessSend.createTextMessage("a");
- TextMessage tm2 = sessSend.createTextMessage("b");
- TextMessage tm3 = sessSend.createTextMessage("c");
- prod.send(tm1);
- prod.send(tm2);
- prod.send(tm3);
- sessSend.close();
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ sessSend.close();
- assertRemainingMessages(3);
+ assertRemainingMessages(3);
- conn.start();
- Session sessReceive = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer cons = sessReceive.createConsumer(queue1);
- MessageListenerClientAck listener = new MessageListenerClientAck(sessReceive);
- cons.setMessageListener(listener);
+ conn.start();
+ Session sessReceive = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+ MessageListenerClientAck listener = new MessageListenerClientAck(sessReceive);
+ cons.setMessageListener(listener);
- listener.waitForMessages();
+ listener.waitForMessages();
- Thread.sleep(500);
+ Thread.sleep(500);
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- conn.close();
+ conn.close();
- assertFalse(listener.failed);
+ assertFalse(listener.failed);
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
}
}
@@ -1135,41 +1134,41 @@
try
{
- conn = cf.createConnection();
- Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sessSend.createProducer(queue1);
+ conn = cf.createConnection();
+ Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessSend.createProducer(queue1);
- TextMessage tm1 = sessSend.createTextMessage("a");
- TextMessage tm2 = sessSend.createTextMessage("b");
- TextMessage tm3 = sessSend.createTextMessage("c");
- prod.send(tm1);
- prod.send(tm2);
- prod.send(tm3);
- sessSend.close();
+ TextMessage tm1 = sessSend.createTextMessage("a");
+ TextMessage tm2 = sessSend.createTextMessage("b");
+ TextMessage tm3 = sessSend.createTextMessage("c");
+ prod.send(tm1);
+ prod.send(tm2);
+ prod.send(tm3);
+ sessSend.close();
- assertRemainingMessages(3);
+ assertRemainingMessages(3);
- conn.start();
- Session sessReceive = conn.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer cons = sessReceive.createConsumer(queue1);
- MessageListenerTransactionalAck listener = new MessageListenerTransactionalAck(sessReceive);
- cons.setMessageListener(listener);
- listener.waitForMessages();
+ conn.start();
+ Session sessReceive = conn.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer cons = sessReceive.createConsumer(queue1);
+ MessageListenerTransactionalAck listener = new MessageListenerTransactionalAck(sessReceive);
+ cons.setMessageListener(listener);
+ listener.waitForMessages();
- Thread.sleep(500);
+ Thread.sleep(500);
- assertRemainingMessages(0);
+ assertRemainingMessages(0);
- conn.close();
+ conn.close();
- assertFalse(listener.failed);
+ assertFalse(listener.failed);
}
finally
{
- if (conn != null)
- {
- conn.close();
- }
+ if (conn != null)
+ {
+ conn.close();
+ }
}
}
@@ -1199,7 +1198,7 @@
public void waitForMessages() throws InterruptedException
{
- assertTrue("failed to receive all messages",latch.attempt(2000));
+ assertTrue("failed to receive all messages", latch.attempt(2000));
}
public abstract void onMessage(Message m);
@@ -1220,7 +1219,7 @@
{
count++;
- TextMessage tm = (TextMessage)m;
+ TextMessage tm = (TextMessage) m;
log.info("Got message: " + tm.getText());
@@ -1279,7 +1278,7 @@
}
- private class MessageListenerDupsOK extends LatchListener
+ private class MessageListenerDupsOK extends LatchListener
{
MessageListenerDupsOK(Session sess)
@@ -1293,7 +1292,7 @@
{
count++;
- TextMessage tm = (TextMessage)m;
+ TextMessage tm = (TextMessage) m;
// Receive first three messages then recover() session
// Only last message should be redelivered
@@ -1366,7 +1365,7 @@
{
count++;
- TextMessage tm = (TextMessage)m;
+ TextMessage tm = (TextMessage) m;
if (count == 1)
{
@@ -1473,7 +1472,7 @@
{
count++;
- TextMessage tm = (TextMessage)m;
+ TextMessage tm = (TextMessage) m;
if (count == 1)
{
More information about the jboss-cvs-commits
mailing list