[jboss-user] [JBoss Messaging] - 1.2.0.GA transparent node failover does not always work

bander do-not-reply at jboss.com
Mon Mar 5 01:43:30 EST 2007


Following on from this thread: http://www.jboss.org/index.html?module=bb&op=viewtopic&t=102491 

I'm currently experiencing multiple failover issues with the 1.2.0.GA release. I'm running two clustered nodes on my local machine (JB4.0.4, Win XP, JVM1.4.2) using all the default settings, following the clustered node instructions in the user guide.

After starting both messaging-node0 and messaging-node1 I start my test case (attached).

The first problem I have with the test case that I created is that the message listener does not receive any of the dispatched messages (the test case creates a message dispatcher and message listener - the dispatcher sends a message to a queue that the listener is attached to). This happens regardless of the queue type (i.e. clustered/non-clustered - in this case testDistributedQueue or testQueue). 
The only way I can get the listener to start receiving messages is to kill one of the nodes e.g. kill node0.

Initially I thought my listener may have ended up on a different node to the dispatcher, so it could not see the messages that were being dispatched but I thought JBoss Messaging handles this scenario?

The second issue is that it's pretty easy to stop messages being dispatched and received altogether by randomly stopping and starting the individual nodes e.g. stop both nodes and bring one back up - my test case was unable to get a connection after both nodes had been shut down.

I'm interested to know if anyone is seeing similar behaviour.

Ben


  | import java.util.Hashtable;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.ExceptionListener;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | 
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | 
  | public class ReconnectTest {
  | 
  | 	class DispatcherThread extends Thread {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public DispatcherThread(ConnectionFactory connectionFactory,
  | 				Queue queue, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageProducer producer = null;
  | 			ExceptionListener exceptionListener = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						exceptionListener = new ExceptionListener() {
  | 							public void onException(JMSException ex) {
  | 								LOG.error("Received connection exception", ex);
  | 								recycle = true;
  | 							}
  | 						};
  | 						connection.setExceptionListener(exceptionListener);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						producer = session.createProducer(queue);
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					JMSHelper.close(producer);
  | 					JMSHelper.close(session);
  | 					JMSHelper.close(connection);
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				if (initialised && (!recycle) && (!shutdown)) {
  | 					try {
  | 						Thread.sleep(1000);
  | 						Message message = session
  | 								.createTextMessage("This is a test");
  | 						producer.send(message);
  | 						LOG.info(id + " dispatched message");
  | 					} catch (Exception ex) {
  | 						LOG.error("Caught exception during send", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	static class JMSHelper {
  | 		public static void close(Connection connection) {
  | 			if (connection != null) {
  | 				try {
  | 					connection.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing connection", ex);
  | 				}
  | 				connection = null;
  | 			}
  | 		}
  | 
  | 		public static void close(MessageConsumer consumer) {
  | 			if (consumer != null) {
  | 				try {
  | 					consumer.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing consumer", ex);
  | 				}
  | 			}
  | 			consumer = null;
  | 		}
  | 
  | 		public static void close(MessageProducer producer) {
  | 			if (producer != null) {
  | 				try {
  | 					producer.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing producer", ex);
  | 				}
  | 			}
  | 			producer = null;
  | 		}
  | 
  | 		public static void close(Session session) {
  | 			if (session != null) {
  | 				try {
  | 					session.close();
  | 				} catch (Exception ex) {
  | 					LOG.error("Caught exception when closing session", ex);
  | 				}
  | 			}
  | 			session = null;
  | 		}
  | 	}
  | 
  | 	class ListenerManagerThread extends Thread {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private MessageListener messageListener;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public ListenerManagerThread(ConnectionFactory connectionFactory,
  | 				Queue queue, MessageListener messageListener, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.messageListener = messageListener;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageConsumer consumer = null;
  | 			ExceptionListener exceptionListener = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						exceptionListener = new ExceptionListener() {
  | 							public void onException(JMSException ex) {
  | 								LOG.error("Received connection exception", ex);
  | 								recycle = true;
  | 							}
  | 						};
  | 						connection.setExceptionListener(exceptionListener);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						consumer = session.createConsumer(queue);
  | 						consumer.setMessageListener(messageListener);
  | 						connection.start();
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					JMSHelper.close(consumer);
  | 					JMSHelper.close(session);
  | 					JMSHelper.close(connection);
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				try {
  | 					Thread.sleep(1000);
  | 				} catch (InterruptedException ex) {
  | 					LOG.error("Caught exception during sleep");
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	class SimpleListener implements MessageListener {
  | 
  | 		private String id;
  | 
  | 		public SimpleListener(String id) {
  | 			super();
  | 			this.id = id;
  | 		}
  | 
  | 		public void onMessage(Message message) {
  | 			LOG.info(id + " received message");
  | 		}
  | 
  | 	}
  | 
  | 	private static final Log LOG = LogFactory.getLog(ReconnectTest.class);
  | 
  | 	public static void main(String[] args) {
  | 		ReconnectTest test = new ReconnectTest();
  | 
  | 		try {
  | 			test.start();
  | 		} catch (Throwable ex) {
  | 			LOG.error("Caught exception in main", ex);
  | 		}
  | 	}
  | 
  | 	private void start() throws Exception {
  | 		// Setup connection 1
  | 		Hashtable properties1 = new Hashtable();
  | 		properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  | 				"org.jnp.interfaces.NamingContextFactory");
  | 		properties1.put(Context.URL_PKG_PREFIXES,
  | 				"org.jboss.naming:org.jnp.interfaces");
  | 		properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  | 		properties1.put(Context.SECURITY_PRINCIPAL, "admin");
  | 		properties1.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  | 		ConnectionFactory connectionFactory1 = null;
  | 		Queue queue1 = null;
  | 		Context context1 = null;
  | 
  | 		context1 = new InitialContext(properties1);
  | 		connectionFactory1 = (ConnectionFactory) context1
  | 				.lookup("ConnectionFactory");
  | 		queue1 = (Queue) context1.lookup("/queue/testDistributedQueue");
  | 
  | 		MessageListener listener1 = new SimpleListener("Listener.1");
  | 		ListenerManagerThread manager1 = new ListenerManagerThread(
  | 				connectionFactory1, queue1, listener1, "ListenerManager.1");
  | 		manager1.start();
  | 
  | 		DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
  | 				queue1, "Dispatcher.1");
  | 		dispatcher1.start();
  | 
  | 		Thread.sleep(Long.MAX_VALUE);
  | 
  | 		manager1.shutdown();
  | 		manager1.join();
  | 
  | 		dispatcher1.shutdown();
  | 		dispatcher1.join();
  | 
  | 		context1.close();
  | 	}
  | }
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4024981#4024981

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4024981



More information about the jboss-user mailing list