The client, which creates a temporary queue, registers it's name to jboss cache and
listening for messages:
| package hu.molaris.client;
|
| import java.util.Properties;
|
| import javax.jms.Connection;
| import javax.jms.ConnectionFactory;
| import javax.jms.Message;
| import javax.jms.MessageConsumer;
| import javax.jms.MessageListener;
| import javax.jms.Queue;
| import javax.jms.Session;
| import javax.jms.TextMessage;
| import javax.naming.InitialContext;
|
| public class Client implements MessageListener {
| private static final String USER_NAME = "testUser";
|
| public static void main(String[] args) throws Exception {
| new Client().run();
| }
|
| @Override
| public void onMessage(Message message) {
| try {
| String receivedText = null;
| TextMessage receivedMessage = (TextMessage) message;
| receivedText = receivedMessage.getText();
|
| System.out.println("Received message: " + receivedText);
| } catch (Exception e) {
| e.printStackTrace();
| }
| }
|
| public void run() {
| Connection connection = null;
| Session session = null;
| Queue temporaryQueue;
| InitialContext ic;
|
| try {
| Properties props = new Properties();
| props.put("java.naming.factory.initial",
"org.jnp.interfaces.NamingContextFactory");
| props.put("java.naming.provider.url",
"avl-32-2:1200,avl-32-3:1200");
| props.put("java.naming.factory.url.pkgs",
"org.jboss.naming:org.jnp.interfaces");
|
| ic = new InitialContext(props);
| ConnectionFactory cf = (ConnectionFactory)
ic.lookup("/ClusteredConnectionFactory");
|
| connection = cf.createConnection();
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
| temporaryQueue = session.createTemporaryQueue();
| MessageConsumer consumer = session.createConsumer(temporaryQueue);
|
| /*
| * Registering temporary queue name into JBoss Cache
| */
| ICommunicationService commService = (ICommunicationService)
ic.lookup("CommunicationSession/remote");
| commService.registerClient(Client.USER_NAME, temporaryQueue.getQueueName());
|
| connection.start();
| consumer.setMessageListener(this);
|
| System.out.println("Waiting for messages...");
| for (int i = 0; i < 100; i++) {
| Thread.sleep(5000);
| System.out.println("tick.");
| }
| } catch (Exception e) {
| e.printStackTrace();
| } finally {
| try {
| session.close();
| connection.close();
| } catch (Exception e) {
| e.printStackTrace();
| }
| }
| System.out.println("Bye");
| }
|
| }
|
The MDB which receives messages and forwards them to the temp queue:
| package hu.molaris.mdb;
|
| import javax.ejb.ActivationConfigProperty;
| import javax.ejb.MessageDriven;
| import javax.jms.Connection;
| import javax.jms.ConnectionFactory;
| import javax.jms.JMSException;
| import javax.jms.Message;
| import javax.jms.MessageListener;
| import javax.jms.MessageProducer;
| import javax.jms.Queue;
| import javax.jms.Session;
| import javax.jms.TextMessage;
| import javax.naming.InitialContext;
|
| @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName =
"destinationType", propertyValue = "javax.jms.Queue"),
| @ActivationConfigProperty(propertyName = "destination", propertyValue =
"queue/CommunicationQueue") })
| public class MessageReceiverMDB implements MessageListener {
|
| private static final String USER_NAME = "testUser";
|
| /**
| * @see MessageListener#onMessage(Message)
| */
| public void onMessage(Message message) {
| Connection conn = null;
| Session session = null;
| try {
| TextMessage tm = (TextMessage) message;
|
| String text = tm.getText();
| System.out.println("message " + text + " received");
|
| InitialContext ic = new InitialContext();
| ConnectionFactory cf = (ConnectionFactory) ic.lookup("java:/JmsXA");
|
| conn = cf.createConnection();
| conn.start();
| session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
| /*
| * Reading temporary queue name from JBoss Cache
| */
| CacheDataHandler cacheDataHandler = ComServCacheDataHandler.getInstance();
| String queue = (String) cacheDataHandler.getData(MessageReceiverMDB.USER_NAME);
|
| Queue forwardTo = session.createQueue(queue);
| MessageProducer producer = session.createProducer(forwardTo);
| TextMessage forward = session.createTextMessage("Message from device: "
+ text);
|
| producer.send(forward);
| producer.close();
|
| ic.close();
| } catch (Exception e) {
| e.printStackTrace();
| System.out.println("The Message Driven Bean failed!");
| } finally {
| try {
| session.close();
| conn.close();
| } catch (JMSException e) {
| e.printStackTrace();
| }
| }
| }
| }
|
The server, which represents the original sender of the messages:
| package hu.molaris.sender;
|
| import java.util.Properties;
|
| import javax.jms.Connection;
| import javax.jms.ConnectionFactory;
| import javax.jms.JMSException;
| import javax.jms.MessageProducer;
| import javax.jms.Queue;
| import javax.jms.Session;
| import javax.jms.TextMessage;
| import javax.naming.InitialContext;
|
| public class Sender {
| private static final String QUEUE_NAME = "queue/CommunicationQueue";
|
| public static void main(String[] args) {
| for (int i = 0; i < 10; i++) {
| new Sender().run();
| }
| }
|
| public void run() {
| Connection connection = null;
| Session session = null;
| MessageProducer sender;
| InitialContext ic;
|
| try {
| Properties props = new Properties();
| props.put("java.naming.factory.initial",
"org.jnp.interfaces.NamingContextFactory");
| props.put("java.naming.provider.url",
"avl-32-2:1200,avl-32-3:1200");
| props.put("java.naming.factory.url.pkgs",
"org.jboss.naming:org.jnp.interfaces");
|
| ic = new InitialContext(props);
|
| ConnectionFactory cf = (ConnectionFactory)
ic.lookup("/ClusteredConnectionFactory");
| Queue queue = (Queue) ic.lookup(Sender.QUEUE_NAME);
| System.out.println("Queue " + Sender.QUEUE_NAME + "
exists");
|
| connection = cf.createConnection();
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
| sender = session.createProducer(queue);
|
| TextMessage message = session.createTextMessage("test message");
|
| connection.start();
| sender.send(message);
|
| System.out.println("The " + message.getText() + " message was
successfully sent to the " + queue.getQueueName() + " queue");
| } catch (Exception e) {
| e.printStackTrace();
| } finally {
| try {
| session.close();
| connection.close();
| } catch (JMSException e) {
| e.printStackTrace();
| }
| }
| }
| }
|
If the cluster consist of 1 node, there is no problem. But if it contains 2 or more, I get
the error.
Thanks!
View the original post :
http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4226946#...
Reply to the post :
http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&a...