[jboss-user] [JBoss Messaging] - Re: Connecting to two JBoss messaging servers causes interfe

bander do-not-reply at jboss.com
Wed Feb 21 23:39:02 EST 2007


After some re-testing of this issue I can confirm it is still present.

I've developed a new test case that creates a message producer and consumer on two separate JBoss Messaging servers. By continually shutting down and restarting each JBoss Messaging server I can eventually cause the shutting down of one server to stop the message listeners on the *other* server.

This test case also demonstrates the same reconnection issue that I've raised in this post http://www.jboss.org/index.html?module=bb&op=viewtopic&t=102233

Our JBoss Messaging servers have different ServerPeerIDs and StoreIds as suggested elsewhere.

To run the test case, start both JBoss Messaging servers then start the test case. A producer will be created on each server and will start dispatching messages to a queue. A message listener on the queue will acknowledge the dispatched message. Stop one of the servers for a while then restart it. The producer and consumer on that server should start up again. Quite often they do not.


  | import java.util.Hashtable;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.ExceptionListener;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | 
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | 
  | public class MultipleServerReconnectTest {
  | 
  | 	class DispatcherThread extends Thread {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public DispatcherThread(ConnectionFactory connectionFactory,//
  | 				Queue queue, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageProducer producer = null;
  | 			ExceptionListener exceptionListener = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						exceptionListener = new ExceptionListener() {
  | 							public void onException(JMSException ex) {
  | 								LOG.error("Received connection exception", ex);
  | 								recycle = true;
  | 							}
  | 						};
  | 						connection.setExceptionListener(exceptionListener);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						producer = session.createProducer(queue);
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					if (producer != null) {
  | 						try {
  | 							producer.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during producer close",
  | 									ex);
  | 						}
  | 					}
  | 					if (session != null) {
  | 						try {
  | 							session.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during session close",
  | 									ex);
  | 						}
  | 					}
  | 					if (connection != null) {
  | 						try {
  | 							connection.close();
  | 						} catch (Exception ex) {
  | 							LOG.error(
  | 									"Caught exception during connection close",
  | 									ex);
  | 						}
  | 					}
  | 					producer = null;
  | 					session = null;
  | 					connection = null;
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				if (initialised && (!recycle) && (!shutdown)) {
  | 					try {
  | 						Thread.sleep(1000);
  | 						Message message = session
  | 								.createTextMessage("This is a test");
  | 						producer.send(message);
  | 						LOG.info(id + " dispatched message");
  | 					} catch (Exception ex) {
  | 						LOG.error("Caught exception during send", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	class ListenerManagerThread extends Thread {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private MessageListener messageListener;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public ListenerManagerThread(ConnectionFactory connectionFactory,
  | 				Queue queue, MessageListener messageListener, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.messageListener = messageListener;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageConsumer consumer = null;
  | 			ExceptionListener exceptionListener = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						exceptionListener = new ExceptionListener() {
  | 							public void onException(JMSException ex) {
  | 								LOG.error("Received connection exception", ex);
  | 								recycle = true;
  | 							}
  | 						};
  | 						connection.setExceptionListener(exceptionListener);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						consumer = session.createConsumer(queue);
  | 						consumer.setMessageListener(messageListener);
  | 						connection.start();
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					if (consumer != null) {
  | 						try {
  | 							consumer.setMessageListener(null);
  | 							consumer.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during consumer close",
  | 									ex);
  | 						}
  | 					}
  | 					if (session != null) {
  | 						try {
  | 							session.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during session close",
  | 									ex);
  | 						}
  | 					}
  | 					if (connection != null) {
  | 						try {
  | 							connection.close();
  | 						} catch (Exception ex) {
  | 							LOG.error(
  | 									"Caught exception during connection close",
  | 									ex);
  | 						}
  | 					}
  | 					consumer = null;
  | 					session = null;
  | 					connection = null;
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				try {
  | 					Thread.sleep(1000);
  | 				} catch (InterruptedException ex) {
  | 					LOG.error("Caught exception during sleep");
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	class SimpleListener implements MessageListener {
  | 
  | 		private String id;
  | 
  | 		public SimpleListener(String id) {
  | 			super();
  | 			this.id = id;
  | 		}
  | 
  | 		/**
  | 		 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
  | 		 */
  | 		public void onMessage(Message message) {
  | 			LOG.info(id + " received message");
  | 		}
  | 
  | 	}
  | 
  | 	private static final Log LOG = LogFactory
  | 			.getLog(MultipleServerReconnectTest.class);
  | 
  | 	public static void main(String[] args) {
  | 		MultipleServerReconnectTest test = new MultipleServerReconnectTest();
  | 
  | 		try {
  | 			test.start();
  | 		} catch (Throwable ex) {
  | 			LOG.error("Caught exception in main", ex);
  | 		}
  | 	}
  | 
  | 	private void start() throws Exception {
  | 		/*
  | 		 * If you want to run the following test case under ActiveMQ 3.2.1 then
  | 		 * only the following properties are required:
  | 		 * 
  | 		 * properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  | 		 * "org.activemq.jndi.ActiveMQInitialContextFactory");
  | 		 * properties1.put(Context.PROVIDER_URL, "tcp://localhost:61616");
  | 		 * properties2.put(Context.INITIAL_CONTEXT_FACTORY,
  | 		 * "org.activemq.jndi.ActiveMQInitialContextFactory");
  | 		 * properties2.put(Context.PROVIDER_URL, "tcp://localhost:61617");
  | 		 * 
  | 		 * For ActiveMQ 4.1.0 the required context factory is
  | 		 * org.apache.activemq.jndi.ActiveMQInitialContextFactory
  | 		 */
  | 		// Setup connection 1
  | 		Hashtable properties1 = new Hashtable();
  | 		properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  | 				"org.jnp.interfaces.NamingContextFactory");
  | 		properties1.put(Context.URL_PKG_PREFIXES,
  | 				"org.jboss.naming:org.jnp.interfaces");
  | 		properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  | 		properties1.put(Context.SECURITY_PRINCIPAL, "admin");
  | 		properties1.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  | 		// Setup connection 2
  | 		Hashtable properties2 = new Hashtable();
  | 		properties2.put(Context.INITIAL_CONTEXT_FACTORY,
  | 				"org.jnp.interfaces.NamingContextFactory");
  | 		properties2.put(Context.URL_PKG_PREFIXES,
  | 				"org.jboss.naming:org.jnp.interfaces");
  | 		// change the following url to point to your second jboss instance
  | 		properties2.put(Context.PROVIDER_URL, "jnp://otherhost:1099");
  | 		properties2.put(Context.SECURITY_PRINCIPAL, "admin");
  | 		properties2.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  | 		ConnectionFactory connectionFactory1 = null;
  | 		Queue queue1 = null;
  | 		Context context1 = null;
  | 
  | 		context1 = new InitialContext(properties1);
  | 		connectionFactory1 = (ConnectionFactory) context1
  | 				.lookup("ConnectionFactory");
  | 		// Make sure this queue has been configured on your jboss server
  | 		// (under ActiveMQ use "dynamicQueues/testQueue")
  | 		queue1 = (Queue) context1.lookup("/queue/tc1_q1");
  | 
  | 		ConnectionFactory connectionFactory2 = null;
  | 		Queue queue2 = null;
  | 		Context context2 = null;
  | 
  | 		context2 = new InitialContext(properties2);
  | 		connectionFactory2 = (ConnectionFactory) context2
  | 				.lookup("ConnectionFactory");
  | 		// Make sure this queue has been configured on your jboss server
  | 		// (under ActiveMQ use "dynamicQueues/testQueue")
  | 		queue2 = (Queue) context2.lookup("/queue/tc1_q1");
  | 
  | 		MessageListener listener1 = new SimpleListener("Listener.1");
  | 		ListenerManagerThread manager1 = new ListenerManagerThread(
  | 				connectionFactory1, queue1, listener1, "ListenerManager.1");
  | 		manager1.start();
  | 
  | 		DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
  | 				queue1, "Dispatcher.1");
  | 		dispatcher1.start();
  | 
  | 		MessageListener listener2 = new SimpleListener("Listener.2");
  | 		ListenerManagerThread manager2 = new ListenerManagerThread(
  | 				connectionFactory2, queue2, listener2, "ListenerManager.2");
  | 		manager2.start();
  | 
  | 		DispatcherThread dispatcher2 = new DispatcherThread(connectionFactory2,
  | 				queue2, "Dispatcher.2");
  | 		dispatcher2.start();
  | 
  | 		// 10 minutes
  | 		Thread.sleep(600000);
  | 
  | 		manager1.shutdown();
  | 		manager1.join();
  | 
  | 		dispatcher1.shutdown();
  | 		dispatcher1.join();
  | 
  | 		manager2.shutdown();
  | 		manager2.join();
  | 
  | 		dispatcher2.shutdown();
  | 		dispatcher2.join();
  | 
  | 		context1.close();
  | 		context2.close();
  | 	}
  | }
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4020399#4020399

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4020399



More information about the jboss-user mailing list