JBoss Community

How to make the JMS receiver client wait till the next message arrives in the queue?

created by chithu21 in JBoss Messaging Development - View the full discussion

Hi,

 

I have a receiver client program and a sender program.

 

Receiver Program:

 

 

import java.io.BufferedReader;

import java.io.DataInput;

import java.io.DataInputStream;

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.sql.SQLException;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.Queue;

import javax.jms.QueueReceiver;

import javax.jms.QueueSession;

import javax.jms.TextMessage;

import javax.naming.NamingException;

 

import com.dcat2.common.Config;

import com.dcat2.common.Constants;

import com.dcat2.common.ReadConfig;

import com.dcat2.logger.LoggerFactory;

import com.dcat2.logger.LoggerImp;

import com.dcat2.datatransfer.DbConnection;

 

 

import EDU.oswego.cs.dl.util.concurrent.CountDown;

 

/**.

* This class contains methods to read messages from the Queue

* and inserts them to receiving DB

*

* This class needs to be updated after getting spec from XIB,

* and is just retained as place holder, it will not work on Jboss 6

*

* @author Chithra V S

*

*/

public class MessageReceiver implements MessageListener {

 

    /**.

     * Initialize the logger

     */

    private static com.dcat2.logger.LogInterface log =

            (LoggerImp) LoggerFactory.getLogger(MessageReceiver.class);

 

    /**.

     * Initialize the counter

     */

    static final int N = 1;

 

    /**.

     * Initialize the CountDown which is used to

     * notify a driver when all threads are complete.

     */

    static CountDown done = new CountDown(N);

 

    /**.

     * Initialize the DCAT Centre Variable

     */

    private static String strDcatCentre = "";

 

 

    /**.

     * Initialize the Rejection Table Name Variable.

     */

    private String strRejectionTable = "";

    /**.

     * Initialize the Counter for DB.

     */

    private static int count = 0 ;

 

    /**.

     * Initialize the ReadConfig

     */

    private static ReadConfig readConfig = new ReadConfig();

 

    /**.

     * Read the properties file

     */

    private static Config config = readConfig.readPropertiesFile();

    static JMSConnector jmsConnector = new JMSConnector();

    private boolean quit = false;

 

    /**.

     * Constructor

     */

    public MessageReceiver() {

        if (log == null) {

            System.err.println(

                    "Failure Creating Logtrace Object!!!");

        }

    }

 

    /**.

     * Inner class which implements MessageListener

     * and receives messages from the Queue

     *

     */

   public static class ExListener implements MessageListener {

 

        DbConnection dbConnection = new DbConnection();

        /**.

         *  @see javax.jms.MessageListener#onMessage(javax.jms.Message)

         *  @param msg - The JMS Message

         */

        public void onMessage(Message msg) {

            log.debug("Inside OnMessage");

            //done.release();

            try {

                String strReceivedMsg = "";

                if (msg instanceof TextMessage) {

                    strReceivedMsg = ((TextMessage) msg).getText();

                } else {

                    strReceivedMsg = msg.toString();

                }

                log.debug("onMessage, recv text=" + strReceivedMsg);

               

                if (strReceivedMsg.equalsIgnoreCase("quit")) {

                    synchronized (this) {

                        quit = true;

                        this.notifyAll(); // Notify main thread to quit

                    }

                }

                dbConnection.insertToReceivingDb(strReceivedMsg,

                        getStrDcatCentre(), config.getRecvMsgTable());

                count++;

            } catch (JMSException e) {

                log.error(

                        "An exception occured while receiving from Queue : "

                                                        + e);

            }

            finally {

                try {

                    //MessageReceiver.done.acquire();

                    jmsConnector.closePTP();

                }/* catch (InterruptedException e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                }*/ catch (JMSException e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                }

               

            }

        }

}

    /**.

     * This methods throws an exception when an exception occurs in

     * the onMessage method

     * @param exception - Exception

     */

    public void onException(final JMSException exception) {

        log.error("An error occurred: " + exception);

    }

 

 

    /**.

     * This method is used to receive messages from the in bound queue

     * and insert them to the receiving database

     * @param None

     * @throws JMSException - JMS Exception

     * @throws NamingException - Naming Exception

     * @throws InterruptedException - Interrupted Exception

     * @throws ClassNotFoundException - Class Not Found Exception

     * @throws SQLException - SQL Exception

     */

    public final void receiveMessage() throws JMSException, NamingException,

            InterruptedException, ClassNotFoundException, SQLException {

 

        log.debug("Begin receiveMessage method");

 

       

 

        QueueObject queueObj = jmsConnector.setupPTP(config.getProviderUrl(),

                config.getXibToCatQueue());

        QueueSession session = queueObj.getQueueSession();

        Queue xibToCatQueue = queueObj.getQueue();

 

        log.debug("Queue Name : " + xibToCatQueue.getQueueName());

 

        // Set the async listener for xibToCatQueue

        QueueReceiver recv = session.createReceiver(xibToCatQueue);

        log.info("************Receiver Is Ready To Receive Messages************");

      recv.setMessageListener(new ExListener());

    

        synchronized (recv) {

            while (!quit) {

                try {

                    recv.wait();

                } catch (InterruptedException ie) {

                }

            }

        }

    

        recv.close();

       

        //log.info(count + " row/rows inserted to " + config.getRecvMsgTable() + " table");

        //log.info("************Receiving Of Messages Completed************");

        //log.debug("End receiveMessage method");

    }

 

    /**.

     * This method returns the value of Dcat Centre

     * @param None

     * @return strDcatCentre

     * @exception None

     */

    public static String getStrDcatCentre() {

        return strDcatCentre;

    }

 

 

    /**.

     * This method sets the value of Dcat Centre

     * @param strDcatCentre = DCAT Centre Code

     * @exception None

     */

    public final void setStrDcatCentre(final String strDcatCentre) {

        MessageReceiver.strDcatCentre = strDcatCentre;

    }

     

   public static void main(final String[] args) throws JMSException,

            NamingException, InterruptedException, ClassNotFoundException,

            SQLException {

 

    

        MessageReceiver msgReceiver = new MessageReceiver();

     

        msgReceiver.receiveMessage();

    }

}

 

Sender:

 

package com.dcat2.messaging.sendreceive;

 

import java.util.ArrayList;

import java.util.Iterator;

 

import javax.jms.DeliveryMode;

import javax.jms.JMSException;

import javax.jms.Queue;

import javax.jms.QueueSender;

import javax.jms.QueueSession;

import javax.jms.TextMessage;

import javax.naming.NamingException;

 

import com.dcat2.common.Config;

import com.dcat2.common.Constants;

import com.dcat2.common.ReadConfig;

import com.dcat2.logger.LoggerFactory;

import com.dcat2.logger.LoggerImp;

import com.dcat2.datatransfer.DbConnection;

 

/**

* This class has methods to send XML messages to the XIB Queue. Note:Once the

* XIB details are available, this class may need modification

*

* @author Chithra V S

*

*/

public class MessageSender {

 

    /**.

     * Initialize the logger

     */

    private static com.dcat2.logger.LogInterface log =

            (LoggerImp) LoggerFactory.getLogger(MessageSender.class);

 

    /**.

     * Constructor

     */

    public MessageSender() {

        if (log == null) {

            System.err.println("Failure Creating Logtrace Object!!!");

        }

    }

 

    /**.

     * This method sends xml message to the out bound queue

     * @param strKey - Key received at run time

     * @param strDcatCentre - DCAT Center Code received at run time

     * @throws JMSException - JMS Exception

     * @throws NamingException - Naming Exception

     */

    public final void sendMessage(final String strKey, final String strDcatCentre)

            throws JMSException, NamingException {

        log.debug("Begin sendMessage Method");

 

        ReadConfig readValues = new ReadConfig();

        Config config = readValues.readPropertiesFile();

        JMSConnector jmsConnector = new JMSConnector();

        String strStatus = "";

        String strXMLMessage = "";

 

        /*

         * Read segments from SendMsg Db Table

         * and sends to the Queue

         */

        DbConnection dbConnection = new DbConnection();

 

        /*QueueObject queueObj = jmsConnector.setupPTP(

                config.getProviderUrlXib(),

                config.getCatToXibQueue());*/

        QueueObject queueObj = jmsConnector.setupPTP(

                config.getProviderUrlXib(),

                config.getXibToCatQueue());

        QueueSession session = queueObj.getQueueSession();

        Queue catToXibQueue = queueObj.getQueue();

        log.debug("Queue Name: " + catToXibQueue.getQueueName());

        QueueSender sender = session.createSender(catToXibQueue);

        sender.setDeliveryMode(DeliveryMode.PERSISTENT);

 

        try {

            ArrayList<String> msgList = dbConnection.readFromSendMsgDb(

                    config.getSendMsgDb(), strKey, strDcatCentre);

            Iterator<String> iterator = msgList.iterator();

            log.info("************Sending Of Messages Started************");

            while (iterator.hasNext()) {

                    strXMLMessage = iterator.next();

                    log.debug("XML Message to be sent : " + strXMLMessage);

                    TextMessage tm = session.createTextMessage(strXMLMessage);

                    sender.send(tm);

                    log.info("Sent Message =" + tm.getText());

                    strStatus = Constants.RETURN_CODE_OK;

                }

        } catch (JMSException e) {

            // write to log file if the message was not sent

            log.error("The message: " + strXMLMessage + " was not sent : " + e);

            strStatus = Constants.RETURN_CODE_NOK;

        } finally {

            if (sender != null) {

                sender.close();

            }

            jmsConnector.closePTP();

        }

        log.info("************Sending Of Messages Completed************");

        log.debug("End sendMessage Method");

    }

 

     /**.

     * Main method

     * @param args - String array of command line arguments

     * @throws JMSException - JMS Exception

     * @throws NamingException - Naming Exception

     */

    public static void main(final String[] args) throws JMSException,

            NamingException {

        String strKey = args[0];

        String strDcatCentre = args[1];

        MessageSender messageSender = new MessageSender();

        messageSender.sendMessage(strKey, strDcatCentre);

    }

}

 

JMSConnector:

 

package com.dcat2.messaging.sendreceive;

 

import java.util.Properties;

 

import javax.jms.JMSException;

import javax.jms.Queue;

import javax.jms.QueueConnection;

import javax.jms.QueueConnectionFactory;

import javax.jms.QueueSession;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

 

import com.dcat2.common.Config;

import com.dcat2.common.ReadConfig;

 

/**.

* This class contains methods to setup a point to point connection

* to a JMS Queue

*

* @author Chithra V S

*

*/

public class JMSConnector {

 

    /**.

     * Queue Connection Object

     */

    private QueueConnection connection;

 

    /**.

     * Queue Session Object

     */

    private QueueSession session;

 

    /**.

     * This method is used to setup a Point To Point Connection

     * @param contextUrl - URL of the Queue Provider

     * @param queueName - Name of the Queue

     * @return QueueObject

     * @throws JMSException - JMS Exception

     * @throws NamingException - Naming Exception

     */

    public final QueueObject setupPTP(

            final String contextUrl, final String queueName)

            throws JMSException, NamingException {

 

        ReadConfig readValues = new ReadConfig();

        Config config = readValues.readPropertiesFile();

        Properties env = new Properties();

        env.put(Context.INITIAL_CONTEXT_FACTORY, config.getContextFactory());

        env.put(Context.SECURITY_PRINCIPAL, config.getJmsUserName());

        env.put(Context.SECURITY_CREDENTIALS, config.getJmsUserPwd());

        env.put(Context.PROVIDER_URL, contextUrl);

        InitialContext iniCtx = new InitialContext(env);

 

        QueueConnectionFactory qcf = (QueueConnectionFactory)iniCtx.lookup(config.getJmsFactory());

 

        connection = qcf.createQueueConnection();

        String lookUpObj = "java:/" + queueName;

        Queue xibToCatQueue =  (Queue) iniCtx.lookup(lookUpObj);

 

        session = connection.createQueueSession(

                false, QueueSession.AUTO_ACKNOWLEDGE);

        connection.start();

 

        return new QueueObject(session, xibToCatQueue);

    }

 

    /**.

     * This method is used to close the Point To Point Connection

     * @param None

     * @throws JMSException - JMS Exception

     */

    public final void closePTP() throws JMSException {

        if (session != null) {

            session.close();

        }

        if (connection != null) {

            connection.stop();

        }

    }

}

 

 

I ran the MessageReceiver & then ran the MessageSender. The receiver program didnt terminate. But it also didnt trigger the onMessage method to receive the messages.

 

Any idea what is wrong?

Reply to this message by going to Community

Start a new discussion in JBoss Messaging Development at Community