[jboss-user] [JBoss Messaging] - Receiving Messages from multiple nodes in a Cluster
mayankmit2002
do-not-reply at jboss.com
Mon Aug 17 23:54:12 EDT 2009
My application is that, in which I've a session bean which is called by the client and an event is fired from that bean on a topic. My JMS remote client is listening to that topic and performs some operations on the type of event fired.
The whole solution works fine if we have a single node in the cluster, but problem occurs when we have multiple nodes in the cluster. The scenario is :
1. Multiple nodes in the cluster, NOICLT22560 and NOICLT13294 and partition name is NOICLT22560_PARTITION.
2. The session bean responsible for firing the event is clustered and is firing the events on the topic using JmsXA.
Being clustered, the call from the client can be executed on any of the node and thus, event can be fired from any node.
But now, what I want is a type of client that can bound to the partition name, instead of the node name to receive events from any of the node in the cluster.
I tried a lot, but my client gets connected to any one of the node and listen events from that node only.
Here is my client code:
|
| package com.abc.jbmtest.client;
|
|
| import java.util.Properties;
|
| import javax.jms.Connection;
| import javax.jms.ConnectionFactory;
| import javax.jms.ExceptionListener;
| import javax.jms.JMSException;
| import javax.jms.Message;
| import javax.jms.MessageConsumer;
| import javax.jms.MessageListener;
| import javax.jms.ObjectMessage;
| import javax.jms.Session;
| import javax.jms.TextMessage;
| import javax.jms.Topic;
| import javax.naming.CommunicationException;
| import javax.naming.Context;
| import javax.naming.InitialContext;
| import javax.naming.NamingException;
|
|
| public class MessageListenerClient
| implements MessageListener, ExceptionListener
| {
|
|
| public MessageListenerClient()
| {
| final Properties props = new Properties();
| props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
| props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
| props.put(Context.PROVIDER_URL, "NOICLT13274:1100,NOICLT22560:1100");
| // props.put("jnp.partitionname", "NOICLT22560_PARTITION");
| // props.put(Context.PROVIDER_URL, "jnp://NOICLT13274:1099,jnp://NOICLT22560:1099 ");
|
| try
| {
| final Context context = new InitialContext(props);
| ConnectionFactory mTopicConnectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory");
| System.out.println(mTopicConnectionFactory);
| Topic mTopic = (Topic) context.lookup("topic/CMSPublic");
|
|
| Connection mTopicConnection = mTopicConnectionFactory.createConnection();
| Session mTopicSession = mTopicConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
| // Create subscriber
| MessageConsumer mSubscriber = mTopicSession.createConsumer(mTopic);
| mSubscriber.setMessageListener(this);
| mTopicConnection.setExceptionListener(this);
|
| mTopicConnection.start();
| }
| catch (final CommunicationException anException)
| {
| System.err.println("Server is not avaliable!!! \n Trying to reconnect the server!! " + anException.getMessage());
| }
| catch (final NamingException anException)
| {
| System.err.println("Server is not avaliable!!! \n Trying to reconnect the server!! " + anException.getMessage());
| // stopJMSEvents();
| // startJMSEvents();
| }
| catch (final JMSException anException)
| {
| System.err.println("Server is not avaliable!!! \n Trying to reconnect the server!! " + anException.getMessage());
|
| }
|
| catch (Exception finalException)
| {
| System.err.println("Exception while starting JMS event listener. You may not be able to receive event, please restart your application");
| finalException.printStackTrace();
| }
|
|
| }
|
|
|
| @Override
| public void onMessage (Message aMessage)
| {
|
| try
| {
| if (aMessage instanceof TextMessage)
| {
| TextMessage message;
| message = (TextMessage) aMessage;
| System.out.println("Message Reciieved ------ " + message.getText());
| }
| else if (aMessage instanceof ObjectMessage)
| {
| ObjectMessage message = (ObjectMessage) aMessage;
| System.out.println("Message Reciieved ------ " + message.getObject());
| }
| else
| {
| System.out.println("Message Reciieved ------ " + aMessage.getClass());
| }
| }
| catch (Exception exception)
| {
| exception.printStackTrace();
| }
|
| }
|
|
| @Override
| public void onException (JMSException anArg0)
| {
| anArg0.printStackTrace();
|
| }
|
|
| public static void main (String[] args) throws Exception
| {
| new Thread(new Runnable()
| {
| /**
| * @see java.lang.Runnable#run()
| */
| @Override
| public void run ()
| {
| new MessageListenerClient();
|
| }
| }).start();
| System.in.read();
| System.out.println("------------Exiting Client Application-------------------------");
| }
| }
|
View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4250040#4250040
Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4250040
More information about the jboss-user
mailing list