[jboss-user] [JBoss Messaging Users] - Re: JMS connection/session handling

theamazingtoby do-not-reply at jboss.com
Thu Sep 17 15:29:26 EDT 2009


Cheers. I'm working with Bob on this issue, and we've found something when keeping open connections, sessions, consumers in our receiver. Below is the code for our sender and our receiver. The receiver continues to check the JMS queue when the queue is stopped and started, and also when the JBoss server is shutdown and restarted. On a queue stop/start, the receiver continues to act as if it's actually trying to get messages. It does not error. When the queue comes back up, the receiver will receive any new messages that arrive after the restart. On a JBoss restart, the receiver does pretty much the same, with no notification of the loss of connectivity, and upon restart, the receiver does NOT receive any new messages that arrive after the JBoss restart. Any ideas on how we can make the receiver realize the server is down so it can handle reconnecting?


Sender code:

  | package com.valpak.renderingservice.indesign.unittests;
  | 
  | import java.util.Properties;
  | 
  | import javax.jms.DeliveryMode;
  | import javax.jms.JMSException;
  | import javax.jms.ObjectMessage;
  | import javax.jms.Queue;
  | import javax.jms.QueueConnection;
  | import javax.jms.QueueConnectionFactory;
  | import javax.jms.QueueReceiver;
  | import javax.jms.QueueSender;
  | import javax.jms.QueueSession;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | import javax.naming.NamingException;
  | 
  | import org.apache.log4j.Logger;
  | 
  | public class JMSSender {
  | 	private static Logger log = Logger.getLogger(JMSReceiver.class);
  | 	private String jmsServer = <jms queue server URL;
  | 	private String jmsQueue = <queue name>;
  | 
  | 	public static void main(String[] args) {
  | 		JMSSender jmss = new JMSSender();
  | 		try {
  | 			while (true) {
  | 				log.debug("Sending a message");
  | 				jmss.sendMessage();
  | 				Thread.sleep(10000);
  | 			}
  | 		} catch (InterruptedException e) {
  | 			e.printStackTrace();
  | 		}
  | 
  | 	}
  | 
  | 	private void sendMessage() {
  | 		InitialContext ctx = null;
  | 		QueueConnection conn = null;
  | 		QueueConnectionFactory tcf = null;
  | 		Queue queue = null;
  | 		QueueSession session = null;
  | 		QueueSender sender = null;
  | 		QueueReceiver receiver = null;
  | 		Properties properties = new Properties();
  | 		int replyWaitMS = 3000;
  | 		ObjectMessage message = null;
  | 
  | 		try {
  | 			properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  | 			properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
  | 			properties.put(Context.PROVIDER_URL, jmsServer);
  | 			ctx = new InitialContext(properties);
  | 			tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
  | 			conn = tcf.createQueueConnection();
  | 			queue = (Queue) ctx.lookup(jmsQueue);
  | 			conn.start();
  | 			session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
  | 			sender = session.createSender(queue);
  | 			sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  | 			String messageBody = "You're an idiot!";
  | 			message = session.createObjectMessage(messageBody);
  | 			Queue tempQueue = session.createTemporaryQueue();
  | 			if (tempQueue != null) {
  | 				message.setJMSReplyTo(tempQueue);
  | 				sender.setTimeToLive(replyWaitMS * 20);
  | 			} else {
  | 				sender.setTimeToLive(60 * 60 * 1000); // 60 minutes
  | 			}
  | 			sender.send(message);
  | 
  | 			if (tempQueue != null) {
  | 				receiver = session.createReceiver(tempQueue);
  | 				Object responseMessage = receiver.receive(replyWaitMS);
  | 				if (responseMessage != null) {
  | 					ObjectMessage response = (ObjectMessage) responseMessage;
  | 					Object responseObject = response.getObject();
  | 					if (responseObject instanceof String) {
  | 						log.debug(responseObject);
  | 					}
  | 				}
  | 			}
  | 		} catch (NamingException e) {
  | 			e.printStackTrace();
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		} finally {
  | 			close(conn, session, receiver, sender);
  | 		}
  | 	}
  | 
  | 	private void close(QueueConnection conn, QueueSession session, QueueReceiver receiver, QueueSender sender) {
  | 		try {
  | 			if (conn != null) {
  | 				conn.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 		try {
  | 			if (session != null) {
  | 				session.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 		try {
  | 			if (receiver != null) {
  | 				receiver.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 		try {
  | 			if (sender != null) {
  | 				sender.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 	}
  | }
  | 

And the receiver code:

  | package com.valpak.renderingservice.indesign.unittests;
  | 
  | import java.io.Serializable;
  | import java.util.Properties;
  | 
  | import javax.jms.DeliveryMode;
  | import javax.jms.Destination;
  | import javax.jms.JMSException;
  | import javax.jms.MessageProducer;
  | import javax.jms.ObjectMessage;
  | import javax.jms.Queue;
  | import javax.jms.QueueConnection;
  | import javax.jms.QueueConnectionFactory;
  | import javax.jms.QueueReceiver;
  | import javax.jms.QueueSession;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | import javax.naming.NamingException;
  | 
  | import org.apache.log4j.Logger;
  | 
  | public class JMSReceiver {
  | 	private static Logger log = Logger.getLogger(JMSReceiver.class);
  | 	private String jmsServer = <jms server URL>;
  | 	private String jmsQueue = <queue name>;
  | 	private int queueWaitSeconds = 3;
  | 	InitialContext ctx = null;
  | 	QueueConnection conn = null;
  | 	QueueConnectionFactory tcf = null;
  | 	Queue queue = null;
  | 	QueueSession session = null;
  | 	QueueReceiver consumer = null;
  | 
  | 	public static void main(String[] args) {
  | 		// Check for messages
  | 		JMSReceiver jmsr = new JMSReceiver();
  | 		try {
  | 			jmsr.initialize();
  | 			while (true) {
  | 				ObjectMessage message = jmsr.checkQueue();
  | 				if (message != null) {
  | 					if (message.getObject() instanceof String) {
  | 						// Output what we got
  | 						log.debug(message.getObject());
  | 						// Send a reply
  | 						String reply = "I know you are, but what am I?";
  | 						jmsr.sendReplyMessage(message.getJMSReplyTo(), reply);
  | 					}
  | 				} else {
  | 					log.debug("No message. Sleeping");
  | 					Thread.sleep(6000);
  | 				}
  | 			}
  | 		} catch (Exception e) {
  | 			e.printStackTrace();
  | 		}
  | 
  | 	}
  | 
  | 	private void initialize() throws Exception {
  | 		Properties properties = new Properties();
  | 		properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  | 		properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
  | 		properties.put(Context.PROVIDER_URL, jmsServer);
  | 		ctx = new InitialContext(properties);
  | 		tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
  | 		conn = tcf.createQueueConnection();
  | 		queue = (Queue) ctx.lookup(jmsQueue);
  | 		conn.start();
  | 		session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
  | 		consumer = session.createReceiver(queue);
  | 	}
  | 
  | 	private ObjectMessage checkQueue() {
  | 
  | 		try {
  | 			ObjectMessage message = (ObjectMessage) consumer.receive(queueWaitSeconds * 1000);
  | 			if (message != null) {
  | 				log.debug("got a message");
  | 				return message;
  | 			} else {
  | 				log.debug("No message found for " + queueWaitSeconds + " seconds");
  | 				return null;
  | 			}
  | 		} catch (Exception e) {
  | 			e.printStackTrace();
  | 		} finally {
  | 
  | 		}
  | 		return null;
  | 	}
  | 
  | 	private void close(QueueConnection conn, QueueSession session, QueueReceiver receiver, MessageProducer producer) {
  | 		try {
  | 			if (conn != null) {
  | 				conn.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 		try {
  | 			if (session != null) {
  | 				session.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 		try {
  | 			if (receiver != null) {
  | 				receiver.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 		try {
  | 			if (producer != null) {
  | 				producer.close();
  | 			}
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		}
  | 	}
  | 
  | 	private void sendReplyMessage(Destination replyTo, Serializable reply) {
  | 		InitialContext ctx = null;
  | 		QueueConnection conn = null;
  | 		QueueConnectionFactory tcf = null;
  | 		QueueSession session = null;
  | 		MessageProducer producer = null;
  | 
  | 		Properties properties = new Properties();
  | 		try {
  | 			properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  | 			properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
  | 			properties.put(Context.PROVIDER_URL, jmsServer);
  | 			ctx = new InitialContext(properties);
  | 			tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
  | 			conn = tcf.createQueueConnection();
  | 			conn.start();
  | 			session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
  | 			producer = session.createProducer(replyTo);
  | 			ObjectMessage response = session.createObjectMessage(reply);
  | 			response.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
  | 			producer.send(response);
  | 		} catch (NamingException e) {
  | 			e.printStackTrace();
  | 		} catch (JMSException e) {
  | 			e.printStackTrace();
  | 		} finally {
  | 			close(conn, session, null, producer);
  | 		}
  | 	}
  | 
  | }
  | 

View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4255824#4255824

Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4255824



More information about the jboss-user mailing list