[jboss-user] [JBoss Messaging] - Removing a MessageListener doesn't remove its associated thr

davidrh do-not-reply at jboss.com
Fri Jul 28 03:20:16 EDT 2006


We are trying to close a session that has a consumer with a MessageListener registered on it.  The consumer and the session both close, but the thread that was created for the MessageListener does not die.  We have tried explicitly setting the MessageListener to null on the consumer, but this actually creates another thread!  Setting the MessageListener to null on the session throws an exception.  Even closing the connection does not remove the MessageListener threads.  I've pasted code below that will demonstrate this behaviour - we run in debug mode in Eclipse and can see the threads being created.

As an aside, the reason that we are trying to close the session that has the MessageListener is that we have noticed that our listeners seem to stop listening after a period of time (in the order of minutes under heavy load).

In our application, we actually create a configurable number of sessions with a listener on each which corresponds to how many threads we want to be working the queue (in a similar way to the code below).  Through our logs, we can see that over time, the receivers stop receiving until there is only one session left processing messages.

We are catching all exceptions in our MessageListener, so they don't propogate up to JBoss Messaging, so as far as JBoss is concerned we always consume the message (we are using auto acknowledge).  We've checked that our receiver is not blocking, and the single receiver that is left does eventually process all of the messages.

The workaround is to periodically close the sessions that we have and create new ones.  This keeps all the listeners processing messages, but now we eventually run out of resources on the server due to the ever increasing number of threads.

Are we doing anything wrong in the code below?  At the moment, I would be happy if I could get rid of the MessageListener threads somehow as this solves our immediate problem of trying to keep multiple threads processing messages from the queue.  Longer term, it would be nice if we didn't have to do this either and the sessions would live indefinitely.

Any help appreciated.

David


  | public class TestMultiSessionMessageListener {
  | 
  |   /**
  |    * @param args
  |    */
  |   public static void main(String[] args) {
  |     TestMultiSessionMessageListener ml = new TestMultiSessionMessageListener();
  | 
  |     try {
  |       ml.test();
  |     } catch (JMSException e) {
  |       // TODO Auto-generated catch block
  |       e.printStackTrace();
  |     }
  |   }
  | 
  |   private void test() throws JMSException {
  |     Hashtable properties = new Hashtable();
  |     properties.put(Context.INITIAL_CONTEXT_FACTORY,
  |         "org.jnp.interfaces.NamingContextFactory");
  |     properties.put(Context.URL_PKG_PREFIXES,
  |         "org.jboss.naming:org.jnp.interfaces");
  |     properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  |     properties.put(Context.SECURITY_PRINCIPAL, "admin");
  |     properties.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  |     ConnectionFactory connectionFactory = null;
  | 
  |     try {
  |       Context context = new InitialContext(properties);
  |       connectionFactory = (ConnectionFactory) context
  |           .lookup("ConnectionFactory");
  |     } catch (NamingException ne) {
  |       throw new RuntimeException(ne);
  |     }
  | 
  |     Connection connection = connectionFactory.createConnection();
  |     connection.start();
  | 
  |     int numberOfProcesses = 1;
  | 
  |     MessageConsumer[] consumers = new MessageConsumer[numberOfProcesses];
  |     Session[] sessions = new Session[numberOfProcesses];
  | 
  |     while (true) {
  |       for (int i = 0; i < consumers.length; i++) {
  |         sessions = connection.createSession(false,
  |             Session.AUTO_ACKNOWLEDGE);
  |         Queue q = sessions.createQueue("publish.request");
  | 
  |         consumers = sessions.createConsumer(q);
  |         MessageListener ml = new MessageListener() {
  | 
  |           public void onMessage(Message arg0) {
  |             System.out.println(Thread.currentThread().getName());
  |           }
  | 
  |         };
  | 
  |         consumers.setMessageListener(ml);
  |       }
  |       try {
  |         Thread.sleep(2000);
  |       } catch (InterruptedException e) {
  |       }
  | 
  |       for (int i = 0; i < consumers.length; i++) {
  |         consumers.setMessageListener(null);
  |         try {
  |           sessions.setMessageListener(null);
  |         } catch (Exception e) {
  | 
  |         }
  |         consumers.close();
  |         consumers = null;
  |         sessions.close();
  |         sessions = null;
  |       }
  |     }
  |   }
  | }
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3961493#3961493

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3961493



More information about the jboss-user mailing list