[jboss-user] [JBoss Messaging] - Re: Redelivery problem when using MessageSelectors
Toby Morris
do-not-reply at jboss.com
Mon Dec 6 09:49:37 EST 2010
Toby Morris [http://community.jboss.org/people/theamazingtoby] created the discussion
"Re: Redelivery problem when using MessageSelectors"
To view the discussion, visit: http://community.jboss.org/message/574516#574516
--------------------------------------------------------------
Here's a simple unit test that performs two tests. Both tests send two messages and attempt to receive two messages. The first sends messages with different custom property "SenderID" values and the second sends messages with the same custom property "SenderID" values. The first succeeeds and the second fails. The JMS queue has a redelivery value greater than 0.
> package jmstest;
>
> import static org.junit.Assert.assertTrue;
> import java.io.Serializable;
> import java.util.Hashtable;
> import java.util.Map;
> import java.util.Properties;
> import javax.jms.DeliveryMode;
> import javax.jms.JMSException;
> import javax.jms.MessageProducer;
> import javax.jms.ObjectMessage;
> import javax.jms.Queue;
> import javax.jms.QueueConnection;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.QueueReceiver;
> import javax.jms.QueueSession;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
> import org.apache.log4j.Logger;
> import org.junit.After;
> import org.junit.AfterClass;
> import org.junit.Before;
> import org.junit.BeforeClass;
> import org.junit.Test;
>
> public class JMSQueueTest
> {
> private static Logger log = Logger.getLogger(JMSQueueTest.class);
>
> private String jmsServer = System.getProperty("JMSSERVER");
>
> private String jmsQueueName = System.getProperty("QUEUE");
>
> @BeforeClass
> public static void setUpBeforeClass() throws Exception
> {
> }
>
> @AfterClass
> public static void tearDownAfterClass() throws Exception
> {
> }
>
> @Before
> public void setUp() throws Exception
> {
> log.info("setUp - cleaning up");
> clearQueue();
> }
>
> @After
> public void tearDown() throws Exception
> {
> log.info("tearDown - cleaning up");
> clearQueue();
> }
>
> public void clearQueue() throws Exception
> {
> boolean areThereMore = true;
> while (areThereMore)
> {
> try
> {
> ObjectMessage message = null;
> message = getNextMessage(jmsServer, jmsQueueName, false, 1, null);
> if (message != null && message.getObject() != null)
> {
> log.debug("There's more");
> }
> else
> {
> log.debug("No more.");
> areThereMore = false;
> }
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> }
> }
>
> @Test
> public void testJMSQueueSucceeds() throws Exception
> {
> ObjectMessage om = null;
> try
> {
> // /////////////////////////////////////////////////////////////////////////
> // Send a couple of messages
> // /////////////////////////////////////////////////////////////////////////
> log.info("--------------------------------------------------------------------------------");
> log.info("sending two messages");
> this.sendJMSMessage("test1");
> this.sendJMSMessage("test2");
> log.info("--------------------------------------------------------------------------------");
> log.debug("Manually retrieving messages");
> om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
> assertTrue(om != null);
> om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test2'");
> assertTrue(om != null);
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> finally
> {
> }
> log.debug("Done.");
> }
>
> @Test
> public void testJMSQueueFails() throws Exception
> {
> ObjectMessage om = null;
> try
> {
> // /////////////////////////////////////////////////////////////////////////
> // Send a couple of messages
> // /////////////////////////////////////////////////////////////////////////
> log.info("--------------------------------------------------------------------------------");
> log.info("sending two messages");
> this.sendJMSMessage("test1");
> this.sendJMSMessage("test1");
> log.info("--------------------------------------------------------------------------------");
> log.debug("Manually retrieving messages");
> om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
> assertTrue(om != null);
> om = this.getNextMessage(jmsServer, jmsQueueName, false, 1, "senderID = 'test1'");
> assertTrue(om != null);
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> finally
> {
> }
> log.debug("Done.");
> }
>
> private void sendJMSMessage(String senderID) throws Exception
> {
> try
> {
> log.debug("Sending...");
> String message = "This is a test";
> Hashtable<String, Object> properties = new Hashtable<String, Object>();
> if (senderID != null)
> {
> properties.put("senderID", senderID);
> log.debug("senderID = " + senderID);
> }
> this.sendMessageX(jmsServer, jmsQueueName, true, (Serializable) message, 20, null, 0, properties, DeliveryMode.NON_PERSISTENT, 4);
> log.debug("Message sent");
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> }
>
> private void sendMessageX(String jmsServer, String jmsQueueName, boolean clientAcknowledge, Serializable message, int ttl, JMSQueue replyTo, int replyWaitSeconds, Map<String, Object> properties, int persistence, int priority) throws NamingException, JMSException
> {
> InitialContext ctx = null;
> QueueConnection conn = null;
> QueueConnectionFactory tcf = null;
> Queue queue = null;
> QueueSession session = null;
> MessageProducer producer = null;
> try
> {
> Properties ctxProperties = new Properties();
> ctxProperties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
> ctxProperties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
> ctxProperties.put(Context.PROVIDER_URL, jmsServer);
> ctx = new InitialContext(ctxProperties);
> tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
> conn = tcf.createQueueConnection();
> queue = (Queue) ctx.lookup(jmsQueueName);
> conn.start();
> if (clientAcknowledge)
> {
> session = conn.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
> }
> else
> {
> session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
> }
> // ////////////////////////////
> log.debug("Start sendMessage()");
> producer = session.createProducer(queue);
> ObjectMessage objMessage = session.createObjectMessage(message);
> if (properties != null)
> {
> for (String key : properties.keySet())
> {
> objMessage.setObjectProperty(key, properties.get(key));
> }
> }
> if (replyTo != null)
> {
> objMessage.setJMSReplyTo(replyTo.getJMSQueue());
> objMessage.setStringProperty("replyToServer", replyTo.getJmsServer());
> objMessage.setStringProperty("replyToQueue", replyTo.getJmsQueueName());
> }
> producer.setTimeToLive(ttl * 1000);
> log.debug("-------------Sending Message");
> producer.send(objMessage, persistence, priority, 0);
> producer.close();
> objMessage = null;
> // //////////////////////////////
> }
> finally
> {
> try
> {
> conn.close();
> conn = null;
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> try
> {
> session.close();
> session = null;
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> }
> }
>
> private ObjectMessage getNextMessage(String jmsServer, String jmsQueueName, boolean clientAcknowledge, int queueWaitSeconds, String selector) throws NamingException, JMSException
> {
> InitialContext ctx = null;
> QueueConnection conn = null;
> QueueConnectionFactory tcf = null;
> Queue queue = null;
> QueueSession session = null;
> QueueReceiver consumer = null;
> try
> {
> Properties properties = new Properties();
> properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
> properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
> properties.put(Context.PROVIDER_URL, jmsServer);
> ctx = new InitialContext(properties);
> tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
> conn = tcf.createQueueConnection();
> queue = (Queue) ctx.lookup(jmsQueueName);
> conn.start();
> if (clientAcknowledge)
> {
> log.debug("CLIENT_ACKNOWLEDGE");
> session = conn.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
> }
> else
> {
> log.debug("AUTO_ACKNOWLEDGE");
> session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
> }
> // ////////////////////////////
> log.debug("Start getNextMessage()");
> if (selector == null)
> {
> log.debug("Not using a selector");
> consumer = session.createReceiver(queue);
> }
> else
> {
> log.debug("Using a selector");
> consumer = session.createReceiver(queue, selector);
> }
> ObjectMessage message = (ObjectMessage) consumer.receive(queueWaitSeconds * 1000);
> consumer.close();
> if (message != null)
> {
> log.debug("----------Got a message");
> return message;
> }
> else
> {
> log.debug("---No message found in " + queueWaitSeconds + " seconds");
> return null;
> }
> // ////////////////////////////////
> }
> finally
> {
> try
> {
> conn.close();
> conn = null;
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> try
> {
> session.close();
> session = null;
> }
> catch (Exception e)
> {
> log.error(e.getMessage(), e);
> }
> }
> }
>
--------------------------------------------------------------
Reply to this message by going to Community
[http://community.jboss.org/message/574516#574516]
Start a new discussion in JBoss Messaging at Community
[http://community.jboss.org/choose-container!input.jspa?contentType=1&containerType=14&container=2042]
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/jboss-user/attachments/20101206/21882d2d/attachment-0001.html
More information about the jboss-user
mailing list