[jboss-user] [JBoss Messaging] - Re: Removing a MessageListener doesn't remove its associated

davidrh do-not-reply at jboss.com
Mon Jul 31 01:20:00 EDT 2006


I've updated the test harness and I can reproduce both problems - the increasing number of threads and the NotFoundInDispatcherException.  From my testing it seems that once a thread in the pool has received a message, it is never destroyed but also is never allocated to another message listener.  In this way, the number of threads slowly builds up over time, as the thread pool doesn't seem to count these threads as being part of the pool anymore.  You can see this behaviour if you produced messages at a slower rate than the consumers are consuming them.  The threads that have consumed messages will never be destroyed, and those that haven't will get destroyed after a certain amount of time (seems to be around 1 minute).

The NotFoundInDispatcherException seems to be caused by setting the message listener to null to try and de-register it.  According to the API http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/MessageConsumer.html#setMessageListener(javax.jms.MessageListener) this should have the effect of unsetting the listener for the message consumer. This doesn't worry me much, as I'll simply stop doing it - I was only doing it to see if it removed the threads more cleanly.

You need to run this against a queue that has lots of text messages in it for this harness to consume.

I've made the harness do exactly what we're doing in our application, which is receive a message from one queue and place a message on another queue.  In this case, the placing of the message on the other queue doesn't seem to influence the above behaviour.


  | public class TestMultiSessionMessageListener {
  | 
  |   public static void main(String[] args) {
  |     TestMultiSessionMessageListener ml = new TestMultiSessionMessageListener();
  | 
  |     try {
  |       ml.test();
  |     } catch (Exception e) {
  |       e.printStackTrace();
  |     }
  |   }
  | 
  |   private void test() throws Exception {
  |     Hashtable properties = new Hashtable();
  |     properties.put(Context.INITIAL_CONTEXT_FACTORY,
  |         "org.jnp.interfaces.NamingContextFactory");
  |     properties.put(Context.URL_PKG_PREFIXES,
  |         "org.jboss.naming:org.jnp.interfaces");
  |     properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  |     properties.put(Context.SECURITY_PRINCIPAL, "admin");
  |     properties.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  |     ConnectionFactory connectionFactory = null;
  | 
  |     try {
  |       Context context = new InitialContext(properties);
  |       connectionFactory = (ConnectionFactory) context
  |           .lookup("ConnectionFactory");
  |     } catch (NamingException ne) {
  |       throw new RuntimeException(ne);
  |     }
  | 
  |     Connection connection = connectionFactory.createConnection();
  |     connection.start();
  | 
  |     // We want to have 5 threads listening for messages
  |     int numberOfProcesses = 5;
  | 
  |     MessageConsumer[] consumers = new MessageConsumer[numberOfProcesses];
  |     Session[] sessions = new Session[numberOfProcesses];
  | 
  |     while (true) {
  |       for (int j = 0; j < consumers.length; j++) {
  |         sessions[j] = connection.createSession(false,
  |             Session.AUTO_ACKNOWLEDGE);
  |         final Queue sourceQ = sessions[j].createQueue("publish.request");
  |         final Queue destQ = sessions[j]
  |             .createQueue("publish.request.error");
  |         final Session session = sessions[j];
  | 
  |         consumers[j] = sessions[j].createConsumer(sourceQ);
  |         MessageListener ml = new MessageListener() {
  | 
  |           public void onMessage(Message msg) {
  |             try {
  |               String payload = ((TextMessage) msg).getText();
  |               System.out.println(Thread.currentThread().getName()
  |                   + " " + payload);
  |               MessageProducer producer = session
  |                   .createProducer(destQ);
  |               producer.send(msg);
  |               try {
  |                 // Simulate normal processing time
  |                 Thread.currentThread().sleep(500);
  |               } catch (InterruptedException e) {
  |                 e.printStackTrace();
  |               }
  |             } catch (JMSException e) {
  |               e.printStackTrace();
  |             }
  |           }
  |         };
  | 
  |         consumers[j].setMessageListener(ml);
  |       }
  | 
  |       // Recycle the sessions every 60 seconds
  |       Thread.sleep(60000);
  |       for (int j = 0; j < consumers.length; j++) {
  |         try {
  |           consumers[j].setMessageListener(null);
  |           consumers[j].close();
  |         } catch (Exception e) {
  |           // Ignore as we are recycling it anyway
  |           e.printStackTrace();
  |         }
  |         consumers[j] = null;
  |         try {
  |           sessions[j].close();
  |         } catch (Exception e) {
  |           // Ignore as we are recycling it anyway
  |           e.printStackTrace();
  |         }
  |         sessions[j] = null;
  |       }
  |     }
  |   }
  | }
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3961768#3961768

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3961768



More information about the jboss-user mailing list