[jboss-user] [JBoss Messaging] - Re: Clustered temporary queue problem

nyeste do-not-reply at jboss.com
Mon Apr 27 05:21:49 EDT 2009


The client, which creates a temporary queue, registers it's name to jboss cache and listening for messages:

  | package hu.molaris.client;
  | 
  | import java.util.Properties;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.jms.TextMessage;
  | import javax.naming.InitialContext;
  | 
  | public class Client implements MessageListener {
  |     private static final String USER_NAME = "testUser";
  | 
  |     public static void main(String[] args) throws Exception {
  | 	new Client().run();
  |     }
  | 
  |     @Override
  |     public void onMessage(Message message) {
  | 	try {
  | 	    String receivedText = null;
  | 	    TextMessage receivedMessage = (TextMessage) message;
  | 	    receivedText = receivedMessage.getText();
  | 
  | 	    System.out.println("Received message: " + receivedText);
  | 	} catch (Exception e) {
  | 	    e.printStackTrace();
  | 	}
  |     }
  | 
  |     public void run() {
  | 	Connection connection = null;
  | 	Session session = null;
  | 	Queue temporaryQueue;
  | 	InitialContext ic;
  | 
  | 	try {
  | 	    Properties props = new Properties();
  | 	    props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
  | 	    props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200");
  | 	    props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
  | 
  | 	    ic = new InitialContext(props);
  | 	    ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory");
  | 
  | 	    connection = cf.createConnection();
  | 	    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | 
  | 	    temporaryQueue = session.createTemporaryQueue();
  | 	    MessageConsumer consumer = session.createConsumer(temporaryQueue);
  | 
  | 	    /*
  | 	     * Registering temporary queue name into JBoss Cache
  | 	     */
  | 	    ICommunicationService commService = (ICommunicationService) ic.lookup("CommunicationSession/remote");
  | 	    commService.registerClient(Client.USER_NAME, temporaryQueue.getQueueName());
  | 
  | 	    connection.start();
  | 	    consumer.setMessageListener(this);
  | 
  | 	    System.out.println("Waiting for messages...");
  | 	    for (int i = 0; i < 100; i++) {
  | 		Thread.sleep(5000);
  | 		System.out.println("tick.");
  | 	    }
  | 	} catch (Exception e) {
  | 	    e.printStackTrace();
  | 	} finally {
  | 	    try {
  | 		session.close();
  | 		connection.close();
  | 	    } catch (Exception e) {
  | 		e.printStackTrace();
  | 	    }
  | 	}
  | 	System.out.println("Bye");
  |     }
  | 
  | }
  | 

The MDB which receives messages and forwards them to the temp queue:

  | package hu.molaris.mdb;
  | 
  | import javax.ejb.ActivationConfigProperty;
  | import javax.ejb.MessageDriven;
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.jms.TextMessage;
  | import javax.naming.InitialContext;
  | 
  | @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  | 	@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/CommunicationQueue") })
  | public class MessageReceiverMDB implements MessageListener {
  | 
  |     private static final String USER_NAME = "testUser";
  | 
  |     /**
  |      * @see MessageListener#onMessage(Message)
  |      */
  |     public void onMessage(Message message) {
  | 	Connection conn = null;
  | 	Session session = null;
  | 	try {
  | 	    TextMessage tm = (TextMessage) message;
  | 
  | 	    String text = tm.getText();
  | 	    System.out.println("message " + text + " received");
  | 
  | 	    InitialContext ic = new InitialContext();
  | 	    ConnectionFactory cf = (ConnectionFactory) ic.lookup("java:/JmsXA");
  | 
  | 	    conn = cf.createConnection();
  | 	    conn.start();
  | 	    session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
  | 
  | 	    /*
  | 	     * Reading temporary queue name from JBoss Cache
  | 	     */
  | 	    CacheDataHandler cacheDataHandler = ComServCacheDataHandler.getInstance();
  | 	    String queue = (String) cacheDataHandler.getData(MessageReceiverMDB.USER_NAME);
  | 
  | 	    Queue forwardTo = session.createQueue(queue);
  | 	    MessageProducer producer = session.createProducer(forwardTo);
  | 	    TextMessage forward = session.createTextMessage("Message from device: " + text);
  | 
  | 	    producer.send(forward);
  | 	    producer.close();
  | 
  | 	    ic.close();
  | 	} catch (Exception e) {
  | 	    e.printStackTrace();
  | 	    System.out.println("The Message Driven Bean failed!");
  | 	} finally {
  | 	    try {
  | 		session.close();
  | 		conn.close();
  | 	    } catch (JMSException e) {
  | 		e.printStackTrace();
  | 	    }
  | 	}
  |     }
  | }
  | 

The server, which represents the original sender of the messages:

  | package hu.molaris.sender;
  | 
  | import java.util.Properties;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.JMSException;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.jms.TextMessage;
  | import javax.naming.InitialContext;
  | 
  | public class Sender {
  |     private static final String QUEUE_NAME = "queue/CommunicationQueue";
  | 
  |     public static void main(String[] args) {
  | 	for (int i = 0; i < 10; i++) {
  | 	    new Sender().run();
  | 	}
  |     }
  | 
  |     public void run() {
  | 	Connection connection = null;
  | 	Session session = null;
  | 	MessageProducer sender;
  | 	InitialContext ic;
  | 
  | 	try {
  | 	    Properties props = new Properties();
  | 	    props.put("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
  | 	    props.put("java.naming.provider.url", "avl-32-2:1200,avl-32-3:1200");
  | 	    props.put("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
  | 
  | 	    ic = new InitialContext(props);
  | 
  | 	    ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ClusteredConnectionFactory");
  | 	    Queue queue = (Queue) ic.lookup(Sender.QUEUE_NAME);
  | 	    System.out.println("Queue " + Sender.QUEUE_NAME + " exists");
  | 
  | 	    connection = cf.createConnection();
  | 	    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | 	    sender = session.createProducer(queue);
  | 
  | 	    TextMessage message = session.createTextMessage("test message");
  | 
  | 	    connection.start();
  | 	    sender.send(message);
  | 
  | 	    System.out.println("The " + message.getText() + " message was successfully sent to the " + queue.getQueueName() + " queue");
  | 	} catch (Exception e) {
  | 	    e.printStackTrace();
  | 	} finally {
  | 	    try {
  | 		session.close();
  | 		connection.close();
  | 	    } catch (JMSException e) {
  | 		e.printStackTrace();
  | 	    }
  | 	}
  |     }
  | }
  | 

If the cluster consist of 1 node, there is no problem. But if it contains 2 or more, I get the error.

Thanks!


View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4226946#4226946

Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4226946



More information about the jboss-user mailing list