[jboss-user] [JBoss Messaging] - Re: Connecting to two JBoss messaging servers causes interfe
bander
do-not-reply at jboss.com
Wed Feb 21 23:39:02 EST 2007
After some re-testing of this issue I can confirm it is still present.
I've developed a new test case that creates a message producer and consumer on two separate JBoss Messaging servers. By continually shutting down and restarting each JBoss Messaging server I can eventually cause the shutting down of one server to stop the message listeners on the *other* server.
This test case also demonstrates the same reconnection issue that I've raised in this post http://www.jboss.org/index.html?module=bb&op=viewtopic&t=102233
Our JBoss Messaging servers have different ServerPeerIDs and StoreIds as suggested elsewhere.
To run the test case, start both JBoss Messaging servers then start the test case. A producer will be created on each server and will start dispatching messages to a queue. A message listener on the queue will acknowledge the dispatched message. Stop one of the servers for a while then restart it. The producer and consumer on that server should start up again. Quite often they do not.
| import java.util.Hashtable;
|
| import javax.jms.Connection;
| import javax.jms.ConnectionFactory;
| import javax.jms.ExceptionListener;
| import javax.jms.JMSException;
| import javax.jms.Message;
| import javax.jms.MessageConsumer;
| import javax.jms.MessageListener;
| import javax.jms.MessageProducer;
| import javax.jms.Queue;
| import javax.jms.Session;
| import javax.naming.Context;
| import javax.naming.InitialContext;
|
| import org.apache.commons.logging.Log;
| import org.apache.commons.logging.LogFactory;
|
| public class MultipleServerReconnectTest {
|
| class DispatcherThread extends Thread {
| private ConnectionFactory connectionFactory;
|
| private String id;
|
| private boolean initialised = false;
|
| private Queue queue;
|
| private boolean recycle = false;
|
| private boolean shutdown = false;
|
| public DispatcherThread(ConnectionFactory connectionFactory,//
| Queue queue, String id) {
| super();
| this.connectionFactory = connectionFactory;
| this.queue = queue;
| this.id = id;
| this.setName(id);
| }
|
| private boolean isRecycle() {
| return recycle;
| }
|
| public void run() {
| Connection connection = null;
| Session session = null;
| MessageProducer producer = null;
| ExceptionListener exceptionListener = null;
|
| while (!shutdown) {
| if (!initialised) {
| try {
| connection = connectionFactory.createConnection();
| exceptionListener = new ExceptionListener() {
| public void onException(JMSException ex) {
| LOG.error("Received connection exception", ex);
| recycle = true;
| }
| };
| connection.setExceptionListener(exceptionListener);
| session = connection.createSession(false,
| Session.AUTO_ACKNOWLEDGE);
| producer = session.createProducer(queue);
| LOG.info(id + " initialised");
| initialised = true;
| } catch (JMSException ex) {
| LOG.error("Caught exception during initialisation", ex);
| recycle = true;
| }
| }
| if (isRecycle()) {
| if (producer != null) {
| try {
| producer.close();
| } catch (Exception ex) {
| LOG.error("Caught exception during producer close",
| ex);
| }
| }
| if (session != null) {
| try {
| session.close();
| } catch (Exception ex) {
| LOG.error("Caught exception during session close",
| ex);
| }
| }
| if (connection != null) {
| try {
| connection.close();
| } catch (Exception ex) {
| LOG.error(
| "Caught exception during connection close",
| ex);
| }
| }
| producer = null;
| session = null;
| connection = null;
| initialised = false;
| recycle = false;
| }
| if (initialised && (!recycle) && (!shutdown)) {
| try {
| Thread.sleep(1000);
| Message message = session
| .createTextMessage("This is a test");
| producer.send(message);
| LOG.info(id + " dispatched message");
| } catch (Exception ex) {
| LOG.error("Caught exception during send", ex);
| recycle = true;
| }
| }
| }
| }
|
| public void shutdown() {
| LOG.info(id + " is shutting down");
| recycle = true;
| shutdown = true;
| }
| }
|
| class ListenerManagerThread extends Thread {
| private ConnectionFactory connectionFactory;
|
| private String id;
|
| private boolean initialised = false;
|
| private MessageListener messageListener;
|
| private Queue queue;
|
| private boolean recycle = false;
|
| private boolean shutdown = false;
|
| public ListenerManagerThread(ConnectionFactory connectionFactory,
| Queue queue, MessageListener messageListener, String id) {
| super();
| this.connectionFactory = connectionFactory;
| this.queue = queue;
| this.messageListener = messageListener;
| this.id = id;
| this.setName(id);
| }
|
| private boolean isRecycle() {
| return recycle;
| }
|
| public void run() {
| Connection connection = null;
| Session session = null;
| MessageConsumer consumer = null;
| ExceptionListener exceptionListener = null;
|
| while (!shutdown) {
| if (!initialised) {
| try {
| connection = connectionFactory.createConnection();
| exceptionListener = new ExceptionListener() {
| public void onException(JMSException ex) {
| LOG.error("Received connection exception", ex);
| recycle = true;
| }
| };
| connection.setExceptionListener(exceptionListener);
| session = connection.createSession(false,
| Session.AUTO_ACKNOWLEDGE);
| consumer = session.createConsumer(queue);
| consumer.setMessageListener(messageListener);
| connection.start();
| LOG.info(id + " initialised");
| initialised = true;
| } catch (JMSException ex) {
| LOG.error("Caught exception during initialisation", ex);
| recycle = true;
| }
| }
| if (isRecycle()) {
| if (consumer != null) {
| try {
| consumer.setMessageListener(null);
| consumer.close();
| } catch (Exception ex) {
| LOG.error("Caught exception during consumer close",
| ex);
| }
| }
| if (session != null) {
| try {
| session.close();
| } catch (Exception ex) {
| LOG.error("Caught exception during session close",
| ex);
| }
| }
| if (connection != null) {
| try {
| connection.close();
| } catch (Exception ex) {
| LOG.error(
| "Caught exception during connection close",
| ex);
| }
| }
| consumer = null;
| session = null;
| connection = null;
| initialised = false;
| recycle = false;
| }
| try {
| Thread.sleep(1000);
| } catch (InterruptedException ex) {
| LOG.error("Caught exception during sleep");
| }
| }
| }
|
| public void shutdown() {
| LOG.info(id + " is shutting down");
| recycle = true;
| shutdown = true;
| }
| }
|
| class SimpleListener implements MessageListener {
|
| private String id;
|
| public SimpleListener(String id) {
| super();
| this.id = id;
| }
|
| /**
| * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
| */
| public void onMessage(Message message) {
| LOG.info(id + " received message");
| }
|
| }
|
| private static final Log LOG = LogFactory
| .getLog(MultipleServerReconnectTest.class);
|
| public static void main(String[] args) {
| MultipleServerReconnectTest test = new MultipleServerReconnectTest();
|
| try {
| test.start();
| } catch (Throwable ex) {
| LOG.error("Caught exception in main", ex);
| }
| }
|
| private void start() throws Exception {
| /*
| * If you want to run the following test case under ActiveMQ 3.2.1 then
| * only the following properties are required:
| *
| * properties1.put(Context.INITIAL_CONTEXT_FACTORY,
| * "org.activemq.jndi.ActiveMQInitialContextFactory");
| * properties1.put(Context.PROVIDER_URL, "tcp://localhost:61616");
| * properties2.put(Context.INITIAL_CONTEXT_FACTORY,
| * "org.activemq.jndi.ActiveMQInitialContextFactory");
| * properties2.put(Context.PROVIDER_URL, "tcp://localhost:61617");
| *
| * For ActiveMQ 4.1.0 the required context factory is
| * org.apache.activemq.jndi.ActiveMQInitialContextFactory
| */
| // Setup connection 1
| Hashtable properties1 = new Hashtable();
| properties1.put(Context.INITIAL_CONTEXT_FACTORY,
| "org.jnp.interfaces.NamingContextFactory");
| properties1.put(Context.URL_PKG_PREFIXES,
| "org.jboss.naming:org.jnp.interfaces");
| properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
| properties1.put(Context.SECURITY_PRINCIPAL, "admin");
| properties1.put(Context.SECURITY_CREDENTIALS, "admin");
|
| // Setup connection 2
| Hashtable properties2 = new Hashtable();
| properties2.put(Context.INITIAL_CONTEXT_FACTORY,
| "org.jnp.interfaces.NamingContextFactory");
| properties2.put(Context.URL_PKG_PREFIXES,
| "org.jboss.naming:org.jnp.interfaces");
| // change the following url to point to your second jboss instance
| properties2.put(Context.PROVIDER_URL, "jnp://otherhost:1099");
| properties2.put(Context.SECURITY_PRINCIPAL, "admin");
| properties2.put(Context.SECURITY_CREDENTIALS, "admin");
|
| ConnectionFactory connectionFactory1 = null;
| Queue queue1 = null;
| Context context1 = null;
|
| context1 = new InitialContext(properties1);
| connectionFactory1 = (ConnectionFactory) context1
| .lookup("ConnectionFactory");
| // Make sure this queue has been configured on your jboss server
| // (under ActiveMQ use "dynamicQueues/testQueue")
| queue1 = (Queue) context1.lookup("/queue/tc1_q1");
|
| ConnectionFactory connectionFactory2 = null;
| Queue queue2 = null;
| Context context2 = null;
|
| context2 = new InitialContext(properties2);
| connectionFactory2 = (ConnectionFactory) context2
| .lookup("ConnectionFactory");
| // Make sure this queue has been configured on your jboss server
| // (under ActiveMQ use "dynamicQueues/testQueue")
| queue2 = (Queue) context2.lookup("/queue/tc1_q1");
|
| MessageListener listener1 = new SimpleListener("Listener.1");
| ListenerManagerThread manager1 = new ListenerManagerThread(
| connectionFactory1, queue1, listener1, "ListenerManager.1");
| manager1.start();
|
| DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
| queue1, "Dispatcher.1");
| dispatcher1.start();
|
| MessageListener listener2 = new SimpleListener("Listener.2");
| ListenerManagerThread manager2 = new ListenerManagerThread(
| connectionFactory2, queue2, listener2, "ListenerManager.2");
| manager2.start();
|
| DispatcherThread dispatcher2 = new DispatcherThread(connectionFactory2,
| queue2, "Dispatcher.2");
| dispatcher2.start();
|
| // 10 minutes
| Thread.sleep(600000);
|
| manager1.shutdown();
| manager1.join();
|
| dispatcher1.shutdown();
| dispatcher1.join();
|
| manager2.shutdown();
| manager2.join();
|
| dispatcher2.shutdown();
| dispatcher2.join();
|
| context1.close();
| context2.close();
| }
| }
|
View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4020399#4020399
Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4020399
More information about the jboss-user
mailing list