[jboss-user] [JBoss Messaging] - 1.2.0.CR1 transparent node failover does not always work

bander do-not-reply at jboss.com
Fri Feb 23 01:49:43 EST 2007


I've been giving 1.2.0.CR1 a bit of a spin (JBoss 4.0.4, Win XP) and have found that failover breaks after a couple of iterations of stopping/starting servers.

I have created two nodes on my local machine according to the clustering guide (using a non-clustered queue). i.e. Node 0 is started via 'run -c messaging-node0' and Node 1 is started via 'run -c messaging-node1'.

After starting both nodes I start my test case and then selectively stop and restart each node. Failover usually occurs the first time a node is stopped but subsequent stops/starts will eventually put a stop to the message producer and message listener.

I've attached a test case if anyone wants to have a play with it in their environment. If you're seeing the dispatcher dispatching and message listener receiving messages then the test is passing - if either stop and then fail to restart then something has gone wrong.

It may be a configuration issue on my side - I'm pretty much using the default installation settings. 


  | import java.util.Hashtable;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.ExceptionListener;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | 
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | 
  | public class ReconnectTest {
  | 
  | 	class DispatcherThread extends Thread {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public DispatcherThread(ConnectionFactory connectionFactory,
  | 				Queue queue, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageProducer producer = null;
  | 			ExceptionListener exceptionListener = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						exceptionListener = new ExceptionListener() {
  | 							public void onException(JMSException ex) {
  | 								LOG.error("Received connection exception", ex);
  | 								recycle = true;
  | 							}
  | 						};
  | 						connection.setExceptionListener(exceptionListener);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						producer = session.createProducer(queue);
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					JMSHelper.close(producer);
  | 					JMSHelper.close(session);
  | 					JMSHelper.close(connection);
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				if (initialised && (!recycle) && (!shutdown)) {
  | 					try {
  | 						Thread.sleep(1000);
  | 						Message message = session
  | 								.createTextMessage("This is a test");
  | 						producer.send(message);
  | 						LOG.info(id + " dispatched message");
  | 					} catch (Exception ex) {
  | 						LOG.error("Caught exception during send", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	static class JMSHelper {
  | 		public static void close(Connection connection) {
  | 			if (connection != null) {
  | 				try {
  | 					connection.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing connection", ex);
  | 				}
  | 				connection = null;
  | 			}
  | 		}
  | 
  | 		public static void close(MessageConsumer consumer) {
  | 			if (consumer != null) {
  | 				try {
  | 					consumer.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing consumer", ex);
  | 				}
  | 			}
  | 			consumer = null;
  | 		}
  | 
  | 		public static void close(MessageProducer producer) {
  | 			if (producer != null) {
  | 				try {
  | 					producer.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing producer", ex);
  | 				}
  | 			}
  | 			producer = null;
  | 		}
  | 
  | 		public static void close(Session session) {
  | 			if (session != null) {
  | 				try {
  | 					session.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing session", ex);
  | 				}
  | 			}
  | 			session = null;
  | 		}
  | 	}
  | 
  | 	class ListenerManagerThread extends Thread {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private MessageListener messageListener;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public ListenerManagerThread(ConnectionFactory connectionFactory,
  | 				Queue queue, MessageListener messageListener, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.messageListener = messageListener;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageConsumer consumer = null;
  | 			ExceptionListener exceptionListener = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						exceptionListener = new ExceptionListener() {
  | 							public void onException(JMSException ex) {
  | 								LOG.error("Received connection exception", ex);
  | 								recycle = true;
  | 							}
  | 						};
  | 						connection.setExceptionListener(exceptionListener);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						consumer = session.createConsumer(queue);
  | 						consumer.setMessageListener(messageListener);
  | 						connection.start();
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					JMSHelper.close(consumer);
  | 					JMSHelper.close(session);
  | 					JMSHelper.close(connection);
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				try {
  | 					Thread.sleep(1000);
  | 				} catch (InterruptedException ex) {
  | 					LOG.error("Caught exception during sleep");
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	class SimpleListener implements MessageListener {
  | 
  | 		private String id;
  | 
  | 		public SimpleListener(String id) {
  | 			super();
  | 			this.id = id;
  | 		}
  | 
  | 		public void onMessage(Message message) {
  | 			LOG.info(id + " received message");
  | 		}
  | 
  | 	}
  | 
  | 	private static final Log LOG = LogFactory.getLog(ReconnectTest.class);
  | 
  | 	public static void main(String[] args) {
  | 		ReconnectTest test = new ReconnectTest();
  | 
  | 		try {
  | 			test.start();
  | 		} catch (Throwable ex) {
  | 			LOG.error("Caught exception in main", ex);
  | 		}
  | 	}
  | 
  | 	private void start() throws Exception {
  | 		// Setup connection 1
  | 		Hashtable properties1 = new Hashtable();
  | 		properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  | 				"org.jnp.interfaces.NamingContextFactory");
  | 		properties1.put(Context.URL_PKG_PREFIXES,
  | 				"org.jboss.naming:org.jnp.interfaces");
  | 		properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  | 		properties1.put(Context.SECURITY_PRINCIPAL, "admin");
  | 		properties1.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  | 		ConnectionFactory connectionFactory1 = null;
  | 		Queue queue1 = null;
  | 		Context context1 = null;
  | 
  | 		context1 = new InitialContext(properties1);
  | 		connectionFactory1 = (ConnectionFactory) context1
  | 				.lookup("ConnectionFactory");
  | 		queue1 = (Queue) context1.lookup("/queue/testQueue");
  | 
  | 		MessageListener listener1 = new SimpleListener("Listener.1");
  | 		ListenerManagerThread manager1 = new ListenerManagerThread(
  | 				connectionFactory1, queue1, listener1, "ListenerManager.1");
  | 		manager1.start();
  | 
  | 		DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
  | 				queue1, "Dispatcher.1");
  | 		dispatcher1.start();
  | 
  | 		Thread.sleep(Long.MAX_VALUE);
  | 
  | 		manager1.shutdown();
  | 		manager1.join();
  | 
  | 		dispatcher1.shutdown();
  | 		dispatcher1.join();
  | 
  | 		context1.close();
  | 	}
  | }
  | 



View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4020997#4020997

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4020997



More information about the jboss-user mailing list