[jboss-user] [JBoss Messaging] - Re: Clustered server preference

clebert.suconic@jboss.com do-not-reply at jboss.com
Wed Apr 16 16:47:49 EDT 2008


Just a FYI (to keep you happy) I have invested a lot of time on this issue now :-)

Load balancing on the ClusteredConnectionFactory, only guarantees that  each cf.createConnection() will point to a different Server. Having said that, a MessageProducer will aways be bound to where its Connection is pointing to.

Also, Messages are aways consumed by local consumers on the cluster preferably. If there are no local consumers, message are redistributed to the cluster.

If you need to guarantee ordering on message redistribution, take a look on this:

http://www.jboss.org/file-access/default/members/jbossmessaging/freezone/docs/userguide-1.4.1.Beta1/html_single/index.html#c_conf.orderingincluster



I have changed your example to send messages on both nodes, and after setting DefaultPreserveOrdering=true, I got the output the way you would expect.

Listener_1 received message: Test_Message_0
  | Listener_2 received message: Test_Message_1
  | Listener_1 received message: Test_Message_2
  | Listener_2 received message: Test_Message_3
  | Listener_1 received message: Test_Message_4
  | Listener_2 received message: Test_Message_5
  | Listener_1 received message: Test_Message_6
  | Listener_2 received message: Test_Message_7
  | Listener_1 received message: Test_Message_8
  | Listener_2 received message: Test_Message_9
  | Listener_1 received message: Test_Message_10
  | Listener_2 received message: Test_Message_11
  | Listener_1 received message: Test_Message_12
  | Listener_2 received message: Test_Message_13
  | Listener_1 received message: Test_Message_14
  | Listener_2 received message: Test_Message_15
  | Listener_1 received message: Test_Message_16
  | Listener_2 received message: Test_Message_17
  | Listener_1 received message: Test_Message_18
  | Listener_2 received message: Test_Message_19
  | Listener_1 received message: Test_Message_20
  | Listener_2 received message: Test_Message_21
  | Listener_1 received message: Test_Message_22
  | Listener_2 received message: Test_Message_23
  | Listener_1 received message: Test_Message_24
  | Listener_2 received message: Test_Message_25
  | Listener_1 received message: Test_Message_26
  | Listener_2 received message: Test_Message_27
  | Listener_1 received message: Test_Message_28
  | Listener_2 received message: Test_Message_29
  | Listener_1 received message: Test_Message_30
  | Listener_2 received message: Test_Message_31
  | Listener_1 received message: Test_Message_32
  | Listener_2 received message: Test_Message_33
  | Listener_1 received message: Test_Message_34
  | Listener_2 received message: Test_Message_35
  | Listener_1 received message: Test_Message_36
  | Listener_2 received message: Test_Message_37
  | Listener_1 received message: Test_Message_38
  | Listener_2 received message: Test_Message_39
  | Listener_1 received message: Test_Message_40
  | Listener_2 received message: Test_Message_41
  | Listener_1 received message: Test_Message_42
  | Listener_2 received message: Test_Message_43
  | Listener_1 received message: Test_Message_44
  | Listener_2 received message: Test_Message_45
  | Listener_1 received message: Test_Message_46
  | Listener_2 received message: Test_Message_47
  | Listener_1 received message: Test_Message_48
  | Listener_2 received message: Test_Message_49
  | 


I really hate posting a full source code on the forum.. but I will open an exception now:


  | 
  | 
  | import java.util.Properties;
  | 
  | import javax.jms.*;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.QueueSession;
  | import javax.jms.Session;
  | import javax.jms.TextMessage;
  | 
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | import javax.naming.NamingException;
  | 
  | 
  | /**
  |  * Class JBMTest
  |  */
  | 
  | public class JBMTest
  | {
  |     static Queue m_producerQueue           = null;
  |     static InitialContext m_initialContext_1 = null;
  |     static ConnectionFactory m_factory_1     = null;
  | 
  |     /**
  |      * Method: initialize
  |      *
  |      *
  |      * @throws NamingException
  |      */
  | 
  |     static void initialize () throws NamingException
  |     {
  |         Properties p = new Properties ();
  |         p.put (Context.PROVIDER_URL, "jnp://10.61.5.1.int:1100");
  |         p.put (Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  |         p.put (Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
  | 
  |         m_initialContext_1 = new InitialContext (p);
  |         m_factory_1        = (ConnectionFactory) m_initialContext_1.lookup ("/ClusteredConnectionFactory");
  | 
  |     
  |     }
  | 
  |     /**
  |      * Method: main
  |      *
  |      *
  |      * @param args
  |      */
  | 
  |     public static void main (String[] args)
  |     {
  |         JBMTest tester = null;
  |         try
  |         {
  |             initialize ();
  | 
  |             tester = new JBMTest ();
  |             tester.sendMessages (50);
  |             tester.consumeMessages (50);
  | 
  |         }
  |         catch (Exception e)
  |         {
  |            e.printStackTrace();
  |         }
  |         finally
  |         {
  |             try
  |             {
  |                 if (tester != null)
  |                     tester.cleanup ();
  |             }
  |             catch (Exception e)
  |             {
  |                 e.printStackTrace();
  |             }
  |         }
  |     }
  | 
  |     /**
  |      * Method: cleanup
  |      *
  |      *
  |      * @throws JMSException
  |      */
  | 
  |     void cleanup () throws JMSException
  |     {
  |     }
  | 
  |     /**
  |      * Method: consumeMessages
  |      *
  |      *
  |      * @param count
  |      */
  | 
  |     void consumeMessages (int count)
  |     {
  |         TestListener listener1 = null;
  |         TestListener listener2 = null;
  | 
  |         try
  |         {
  |             listener1 = new TestListener ("Listener_1", m_initialContext_1, m_factory_1);
  |             listener2 = new TestListener ("Listener_2", m_initialContext_1, m_factory_1);
  |             listener1.m_jmsConnection.start ();
  |             listener2.m_jmsConnection.start ();
  | 
  | 
  |             while ((listener1.m_count + listener2.m_count) < count)
  |             {
  |                 Thread.sleep (1000);
  |             }
  |             
  |         }
  |         catch (Exception e)
  |         {
  |             e.printStackTrace();
  |         }
  |         finally
  |         {
  |             try
  |             {
  |                 if (listener1 != null)
  |                     listener1.cleanup ();
  | 
  |                 if (listener2 != null)
  |                     listener2.cleanup ();
  |             }
  |             catch (Exception e)
  |             {
  |                e.printStackTrace();
  |             }
  |         }
  |     }
  | 
  |     /**
  |      * Method: sendMessages
  |      *
  |      *
  |      * @param count
  |      */
  | 
  |     void sendMessages (int count)
  |     {
  |         try
  |         {
  |             m_producerQueue = (Queue) m_initialContext_1.lookup ("/queue/testDistributedQueue");
  |             Connection conn1 = m_factory_1.createConnection ();
  |             Session session1 = (QueueSession) conn1.createSession (false, Session.AUTO_ACKNOWLEDGE);
  |             MessageProducer producer1 = session1.createProducer (m_producerQueue);
  | 
  |             Connection conn2 = m_factory_1.createConnection ();
  |             Session session2 = (QueueSession) conn2.createSession (false, Session.AUTO_ACKNOWLEDGE);
  |             MessageProducer producer2 = session2.createProducer (m_producerQueue);
  | 
  |             for (int i = 0; i < count; i++)
  |             {
  |                 System.out.println("Queueing message: Test_Message_" + i);
  |                 System.out.flush();
  |                 TextMessage tm = session1.createTextMessage ("Test_Message_" + i);
  |                 if (i%2 == 0)
  |                 {
  |                    producer1.send (tm);
  |                 }
  |                 else
  |                 {
  |                    producer2.send (tm);
  |                 }
  |             }
  |             
  |             conn1.close();
  |             
  |             conn2.close();
  |         }
  |         catch (Exception e)
  |         {
  |            e.printStackTrace();
  |         }
  |     }
  | }
  | 
  | /**
  |  * Class TestListener
  |  */
  | 
  | class TestListener implements MessageListener
  | {
  |     int m_count                = 0;
  |     MessageConsumer m_consumer = null;
  |     Queue m_responseQueue      = null;
  |     Connection m_jmsConnection = null;
  |     QueueSession m_jmsSession  = null;
  |     String m_name              = null;
  | 
  |     /**
  |      * Constructor: TestListener
  |      *
  |      *
  |      * @param name
  |      * @param ic
  |      * @param factory
  |      *
  |      * @throws JMSException
  |      * @throws NamingException
  |      */
  | 
  |     TestListener (String name,
  |                   InitialContext ic,
  |                   ConnectionFactory factory) throws NamingException, JMSException
  |     {
  |         m_responseQueue = (Queue) ic.lookup ("/queue/testDistributedQueue");
  |         m_jmsConnection = factory.createConnection ();
  |         m_jmsSession    = (QueueSession) m_jmsConnection.createSession (false, Session.AUTO_ACKNOWLEDGE);
  |         m_consumer      = m_jmsSession.createConsumer (m_responseQueue);
  |         m_consumer.setMessageListener (this);
  |         m_name = name;
  |     }
  | 
  |     /**
  |      * Method: cleanup
  |      *
  |      *
  |      * @throws JMSException
  |      */
  | 
  |     void cleanup () throws JMSException
  |     {
  |         if (m_consumer != null)
  |             m_consumer.close ();
  |         if (m_jmsSession != null)
  |             m_jmsSession.close ();
  |         if (m_jmsConnection != null)
  |             m_jmsConnection.close ();
  |     }
  | 
  |     /**
  |      * Method: onMessage
  |      *
  |      *
  |      * @param arg0
  |      */
  | 
  |     public void onMessage (Message arg0)
  |     {
  |         try
  |         {
  |             TextMessage msg = (TextMessage) arg0;
  |             System.out.println(m_name + " received message: " + msg.getText ());
  |             Thread.sleep (1000);
  |             msg.acknowledge();
  |             m_count++;
  |         }
  |         catch (Exception e)
  |         {
  |             e.printStackTrace();
  |         }
  |     }
  | }
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4144643#4144643

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4144643



More information about the jboss-user mailing list