[jboss-dev-forums] [JBoss Messaging Development] - How to make the JMS receiver client wait till the next message arrives in the queue?

chithu21 do-not-reply at jboss.com
Thu Jul 12 06:05:27 EDT 2012


chithu21 [https://community.jboss.org/people/chithu21] created the discussion

"How to make the JMS receiver client wait till the next message arrives in the queue?"

To view the discussion, visit: https://community.jboss.org/message/747708#747708

--------------------------------------------------------------
Hi,

I have a receiver client program and a sender program.

Receiver Program:


import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.SQLException;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;

import com.dcat2.common.Config;
import com.dcat2.common.Constants;
import com.dcat2.common.ReadConfig;
import com.dcat2.logger.LoggerFactory;
import com.dcat2.logger.LoggerImp;
import com.dcat2.datatransfer.DbConnection;


import EDU.oswego.cs.dl.util.concurrent.CountDown;

/**.
 * This class contains methods to read messages from the Queue
 * and inserts them to receiving DB
 *
 * This class needs to be updated after getting spec from XIB,
 * and is just retained as place holder, it will not work on Jboss 6
 * 
 * @author Chithra V S
 *
 */
public class MessageReceiver implements MessageListener {

    /**.
     * Initialize the logger
     */
    private static com.dcat2.logger.LogInterface log =
            (LoggerImp) LoggerFactory.getLogger(MessageReceiver.class);

    /**.
     * Initialize the counter
     */
    static final int N = 1;

    /**.
     * Initialize the CountDown which is used to
     * notify a driver when all threads are complete.
     */
    static CountDown done = new CountDown(N);

    /**.
     * Initialize the DCAT Centre Variable
     */
    private static String strDcatCentre = "";


    /**.
     * Initialize the Rejection Table Name Variable.
     */
    private String strRejectionTable = "";
    /**.
     * Initialize the Counter for DB.
     */
    private static int count = 0 ;

    /**.
     * Initialize the ReadConfig
     */
    private static ReadConfig readConfig = new ReadConfig();

    /**.
     * Read the properties file
     */
    private static Config config = readConfig.readPropertiesFile();
    static JMSConnector jmsConnector = new JMSConnector();
    private boolean quit = false;

    /**.
     * Constructor
     */
    public MessageReceiver() {
        if (log == null) {
            System.err.println(
                    "Failure Creating Logtrace Object!!!");
        }
    }

    /**.
     * Inner class which implements MessageListener
     * and receives messages from the Queue
     *
     */
   public static class ExListener implements MessageListener {

        DbConnection dbConnection = new DbConnection();
        /**.
         *  @see javax.jms.MessageListener#onMessage(javax.jms.Message)
         *  @param msg - The JMS Message
         */
        public void onMessage(Message msg) {
            log.debug("Inside OnMessage");
            //done.release();
            try {
                String strReceivedMsg = "";
                if (msg instanceof TextMessage) {
                    strReceivedMsg = ((TextMessage) msg).getText();
                } else {
                    strReceivedMsg = msg.toString();
                }
                log.debug("onMessage, recv text=" + strReceivedMsg);

                if (strReceivedMsg.equalsIgnoreCase("quit")) {
                    synchronized (this) {
                        quit = true;
                        this.notifyAll(); // Notify main thread to quit
                    }
                }
                dbConnection.insertToReceivingDb(strReceivedMsg,
                        getStrDcatCentre(), config.getRecvMsgTable());
                count++;
            } catch (JMSException e) {
                log.error(
                        "An exception occured while receiving from Queue : "
                                                        + e);
            }
            finally {
                try {
                    //MessageReceiver.done.acquire();
                    jmsConnector.closePTP();
                }/* catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }*/ catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }
}
    /**.
     * This methods throws an exception when an exception occurs in
     * the onMessage method
     * @param exception - Exception
     */
    public void onException(final JMSException exception) {
        log.error("An error occurred: " + exception);
    }


    /**.
     * This method is used to receive messages from the in bound queue
     * and insert them to the receiving database
     * @param None
     * @throws JMSException - JMS Exception
     * @throws NamingException - Naming Exception
     * @throws InterruptedException - Interrupted Exception
     * @throws ClassNotFoundException - Class Not Found Exception
     * @throws SQLException - SQL Exception
     */
    public final void receiveMessage() throws JMSException, NamingException,
            InterruptedException, ClassNotFoundException, SQLException {

        log.debug("Begin receiveMessage method");



        QueueObject queueObj = jmsConnector.setupPTP(config.getProviderUrl(),
                config.getXibToCatQueue());
        QueueSession session = queueObj.getQueueSession();
        Queue xibToCatQueue = queueObj.getQueue();

        log.debug("Queue Name : " + xibToCatQueue.getQueueName());

        // Set the async listener for xibToCatQueue
        QueueReceiver recv = session.createReceiver(xibToCatQueue);
        log.info("************Receiver Is Ready To Receive Messages************");
      recv.setMessageListener(new ExListener());

        synchronized (recv) {
            while (!quit) {
                try {
                    recv.wait();
                } catch (InterruptedException ie) {
                }
            }
        }

        recv.close();

        //log.info(count + " row/rows inserted to " + config.getRecvMsgTable() + " table");
        //log.info("************Receiving Of Messages Completed************");
        //log.debug("End receiveMessage method");
    }

    /**.
     * This method returns the value of Dcat Centre
     * @param None
     * @return strDcatCentre
     * @exception None
     */
    public static String getStrDcatCentre() {
        return strDcatCentre;
    }


    /**.
     * This method sets the value of Dcat Centre
     * @param strDcatCentre = DCAT Centre Code
     * @exception None
     */
    public final void setStrDcatCentre(final String strDcatCentre) {
        MessageReceiver.strDcatCentre = strDcatCentre;
    }

   public static void main(final String[] args) throws JMSException,
            NamingException, InterruptedException, ClassNotFoundException,
            SQLException {


        MessageReceiver msgReceiver = new MessageReceiver();

        msgReceiver.receiveMessage();
    }
}

Sender:

package com.dcat2.messaging.sendreceive;

import java.util.ArrayList;
import java.util.Iterator;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.NamingException;

import com.dcat2.common.Config;
import com.dcat2.common.Constants;
import com.dcat2.common.ReadConfig;
import com.dcat2.logger.LoggerFactory;
import com.dcat2.logger.LoggerImp;
import com.dcat2.datatransfer.DbConnection;

/**
 * This class has methods to send XML messages to the XIB Queue. Note:Once the
 * XIB details are available, this class may need modification
 *
 * @author Chithra V S
 *
 */
public class MessageSender {

    /**.
     * Initialize the logger
     */
    private static com.dcat2.logger.LogInterface log =
            (LoggerImp) LoggerFactory.getLogger(MessageSender.class);

    /**.
     * Constructor
     */
    public MessageSender() {
        if (log == null) {
            System.err.println("Failure Creating Logtrace Object!!!");
        }
    }

    /**.
     * This method sends xml message to the out bound queue
     * @param strKey - Key received at run time
     * @param strDcatCentre - DCAT Center Code received at run time
     * @throws JMSException - JMS Exception
     * @throws NamingException - Naming Exception
     */
    public final void sendMessage(final String strKey, final String strDcatCentre)
            throws JMSException, NamingException {
        log.debug("Begin sendMessage Method");

        ReadConfig readValues = new ReadConfig();
        Config config = readValues.readPropertiesFile();
        JMSConnector jmsConnector = new JMSConnector();
        String strStatus = "";
        String strXMLMessage = "";

        /*
         * Read segments from SendMsg Db Table
         * and sends to the Queue
         */
        DbConnection dbConnection = new DbConnection();

        /*QueueObject queueObj = jmsConnector.setupPTP(
                config.getProviderUrlXib(),
                config.getCatToXibQueue());*/
        QueueObject queueObj = jmsConnector.setupPTP(
                config.getProviderUrlXib(),
                config.getXibToCatQueue());
        QueueSession session = queueObj.getQueueSession();
        Queue catToXibQueue = queueObj.getQueue();
        log.debug("Queue Name: " + catToXibQueue.getQueueName());
        QueueSender sender = session.createSender(catToXibQueue);
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);

        try {
            ArrayList<String> msgList = dbConnection.readFromSendMsgDb(
                    config.getSendMsgDb(), strKey, strDcatCentre);
            Iterator<String> iterator = msgList.iterator();
            log.info("************Sending Of Messages Started************");
            while (iterator.hasNext()) {
                    strXMLMessage = iterator.next();
                    log.debug("XML Message to be sent : " + strXMLMessage);
                    TextMessage tm = session.createTextMessage(strXMLMessage);
                    sender.send(tm);
                    log.info("Sent Message =" + tm.getText());
                    strStatus = Constants.RETURN_CODE_OK;
                }
        } catch (JMSException e) {
            // write to log file if the message was not sent
            log.error("The message: " + strXMLMessage + " was not sent : " + e);
            strStatus = Constants.RETURN_CODE_NOK;
        } finally {
            if (sender != null) {
                sender.close();
            }
            jmsConnector.closePTP();
        }
        log.info("************Sending Of Messages Completed************");
        log.debug("End sendMessage Method");
    }

     /**.
     * Main method
     * @param args - String array of command line arguments
     * @throws JMSException - JMS Exception
     * @throws NamingException - Naming Exception
     */
    public static void main(final String[] args) throws JMSException,
            NamingException {
        String strKey = args[0];
        String strDcatCentre = args[1];
        MessageSender messageSender = new MessageSender();
        messageSender.sendMessage(strKey, strDcatCentre);
    }
}

JMSConnector:

package com.dcat2.messaging.sendreceive;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.dcat2.common.Config;
import com.dcat2.common.ReadConfig;

/**.
 * This class contains methods to setup a point to point connection
 * to a JMS Queue
 *
 * @author Chithra V S
 *
 */
public class JMSConnector {

    /**.
     * Queue Connection Object
     */
    private QueueConnection connection;

    /**.
     * Queue Session Object
     */
    private QueueSession session;

    /**.
     * This method is used to setup a Point To Point Connection
     * @param contextUrl - URL of the Queue Provider
     * @param queueName - Name of the Queue
     * @return QueueObject
     * @throws JMSException - JMS Exception
     * @throws NamingException - Naming Exception
     */
    public final QueueObject setupPTP(
            final String contextUrl, final String queueName)
            throws JMSException, NamingException {

        ReadConfig readValues = new ReadConfig();
        Config config = readValues.readPropertiesFile();
        Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY, config.getContextFactory());
        env.put(Context.SECURITY_PRINCIPAL, config.getJmsUserName());
        env.put(Context.SECURITY_CREDENTIALS, config.getJmsUserPwd());
        env.put(Context.PROVIDER_URL, contextUrl);
        InitialContext iniCtx = new InitialContext(env);

        QueueConnectionFactory qcf = (QueueConnectionFactory)iniCtx.lookup(config.getJmsFactory());

        connection = qcf.createQueueConnection();
        String lookUpObj = "java:/" + queueName;
        Queue xibToCatQueue =  (Queue) iniCtx.lookup(lookUpObj);

        session = connection.createQueueSession(
                false, QueueSession.AUTO_ACKNOWLEDGE);
        connection.start();

        return new QueueObject(session, xibToCatQueue);
    }

    /**.
     * This method is used to close the Point To Point Connection
     * @param None
     * @throws JMSException - JMS Exception
     */
    public final void closePTP() throws JMSException {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.stop();
        }
    }
}


I ran the MessageReceiver & then ran the MessageSender. The receiver program didnt terminate. But it also didnt trigger the onMessage method to receive the messages.

Any idea what is wrong?
--------------------------------------------------------------

Reply to this message by going to Community
[https://community.jboss.org/message/747708#747708]

Start a new discussion in JBoss Messaging Development at Community
[https://community.jboss.org/choose-container!input.jspa?contentType=1&containerType=14&container=2043]

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/jboss-dev-forums/attachments/20120712/8ca434bd/attachment.html 


More information about the jboss-dev-forums mailing list