[jboss-user] [JBoss Messaging] - Re: How do I empty a JBoss queue?

asaur do-not-reply at jboss.com
Mon Aug 4 09:55:21 EDT 2008


oops. I forgot to put the client code... here it goes (I have cut out the package and import sections, just for brevity):

=======================
public class PPMessageConsumer implements ServletContextListener {
    private static final int NUMBER_OF_LISTENERS = 5;
    AppListenerThread[] consumer = new AppListenerThread[NUMBER_OF_LISTENERS];
    Properties context = new Properties();

    private static final Logger log = Logger.getLogger("messaging");

    private static final String[] QUEUE_APPS = new String[] {
        "/queue/messageIdQueueApp1",
        "/queue/messageIdQueueApp2",
        "/queue/messageIdQueueApp3",
        "/queue/messageIdQueueApp4",
        "/queue/messageIdQueueApp5"
    };

    public void contextDestroyed(ServletContextEvent sce) {
        for (int i = 0; i < NUMBER_OF_LISTENERS; i++) {
            consumer.should_die = true;
            consumer.interrupt();
        }
    }

    public void contextInitialized(ServletContextEvent sce) {
        InetAddress addr;
        BufferedInputStream fs = new BufferedInputStream(this.getClass().getResourceAsStream("/parperfeito/service/locator/jndi.properties"));
        int app = 0;
        
        try {
            context.load(fs);

            addr = InetAddress.getLocalHost();
            log.debug("Host addr = " + addr.getHostAddress());
            log.debug("Host name = " + addr.getHostName());

            if (addr != null) {
                String name = addr.getHostName();
                //String name = "pp5.parperfeito.com.br";
                int pos = name.indexOf(".");
                name = name.substring(pos -1 , pos);
                app = Integer.parseInt(name) - 1;
            }

            for (int i = 0; i < NUMBER_OF_LISTENERS; i++) {
                consumer = new AppListenerThread(QUEUE_APPS[app]);
                consumer.start();
            }

        } catch (UnknownHostException ex) {
            log.error(ex);
        } catch (IOException ex) {
            log.error(ex);
        } catch (NumberFormatException ex) {
            log.error(ex);
        } finally {
            if (fs != null) {
                try {
                    fs.close();
                } catch (IOException ex) {}
            }
        }

    }

    public void testHost() {
        InetAddress addr;
        try {
            addr = InetAddress.getLocalHost();
            System.out.println("Host addr = " + addr.getHostAddress());
            System.out.println("Host name = " + addr.getHostName());
            for (int i = 0; i < NUMBER_OF_LISTENERS; i++) {
                consumer = new AppListenerThread(QUEUE_APPS[0]);
                consumer.start();
            }
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }

    private class AppListenerThread extends Thread  implements ExceptionListener {
        private static final String CONNECTION_FACTORY_NAME = "/ConnectionFactory";
        public boolean should_die = false;
        private boolean recycle = false;
        private boolean initialised = false;
        private String queue_name = null;

        public AppListenerThread(String app_queue) {
            queue_name = app_queue;
        }

        public void run() {
            InitialContext ic = null;
            ConnectionFactory factory = null;
            Connection connection = null;
            Queue message_queue = null;
            Session session = null;
            MessageConsumer consumer = null;

            log.info("[Message Consumer] Destination queue is " + queue_name);

            if (queue_name != null) {
                while (!should_die) {
                    if (!initialised) {
                        // finding factory to create a connection and the queue to listen to
                        try {
                            ic = new InitialContext(context);
                            factory = (ConnectionFactory) ic.lookup(CONNECTION_FACTORY_NAME);
                            message_queue = (Queue) ic.lookup(queue_name);
                        } catch (NamingException e) {
                            log.error(e);
                        } finally {
                            if (ic != null) {
                                try {
                                    ic.close();
                                } catch (NamingException e) { }
                            }
                        }

                        // setting listener up
                        try {
                            if (message_queue != null) {
                                connection = factory.createConnection();
                                connection.setExceptionListener(this);
                                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                                consumer = session.createConsumer(message_queue);

                                // this must be asynchronous, so we need to implement a message listener
                                consumer.setMessageListener(new PPMessageListener());
                                connection.start();
                                initialised = true;
                            }
                        } catch (JMSException e) {
                            log.error(e);
                            recycle = true;
                        }
                    }
                    if (recycle) {
                        if (consumer != null) {
                            try {
                                consumer.setMessageListener(null);
                                consumer.close();
                            } catch (Exception ex) {}
                        }
                        if (session != null) {
                            try {
                                session.close();
                            } catch (Exception ex) {}
                        }
                        if (connection != null) {
                            try {
                                connection.close();
                            } catch (Exception ex) {}
                        }
                        consumer = null;
                        session = null;
                        connection = null;
                        factory = null;
                        message_queue = null;
                        initialised = false;
                        recycle = false;
                    }
                    log.debug("[Message Consumer] Consumer thread going to sleep at " + new java.util.Date());
                    try {
                        Thread.sleep(50000);
                    } catch (InterruptedException e) {
                        log.info("[Message Consumer] Oh my! Someone called!");
                    }
                    log.debug("[Message Consumer] Consumer thread woke up at " + new java.util.Date());
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {}
            }
            log.info("[Message Consumer] Consumer thread finishing processing at " + new java.util.Date());
        }

        public void onException(JMSException ex) {
            log.error(ex);
            recycle = true;
        }
    }
}
=====================================

Hope this can someway share a light...



View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4168453#4168453

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4168453



More information about the jboss-user mailing list