I've updated the test harness and I can reproduce both problems - the increasing
number of threads and the NotFoundInDispatcherException. From my testing it seems that
once a thread in the pool has received a message, it is never destroyed but also is never
allocated to another message listener. In this way, the number of threads slowly builds
up over time, as the thread pool doesn't seem to count these threads as being part of
the pool anymore. You can see this behaviour if you produced messages at a slower rate
than the consumers are consuming them. The threads that have consumed messages will never
be destroyed, and those that haven't will get destroyed after a certain amount of time
(seems to be around 1 minute).
The NotFoundInDispatcherException seems to be caused by setting the message listener to
null to try and de-register it. According to the API
http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/jms/MessageConsumer.h...
this should have the effect of unsetting the listener for the message consumer. This
doesn't worry me much, as I'll simply stop doing it - I was only doing it to see
if it removed the threads more cleanly.
You need to run this against a queue that has lots of text messages in it for this harness
to consume.
I've made the harness do exactly what we're doing in our application, which is
receive a message from one queue and place a message on another queue. In this case, the
placing of the message on the other queue doesn't seem to influence the above
behaviour.
| public class TestMultiSessionMessageListener {
|
| public static void main(String[] args) {
| TestMultiSessionMessageListener ml = new TestMultiSessionMessageListener();
|
| try {
| ml.test();
| } catch (Exception e) {
| e.printStackTrace();
| }
| }
|
| private void test() throws Exception {
| Hashtable properties = new Hashtable();
| properties.put(Context.INITIAL_CONTEXT_FACTORY,
| "org.jnp.interfaces.NamingContextFactory");
| properties.put(Context.URL_PKG_PREFIXES,
| "org.jboss.naming:org.jnp.interfaces");
| properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
| properties.put(Context.SECURITY_PRINCIPAL, "admin");
| properties.put(Context.SECURITY_CREDENTIALS, "admin");
|
| ConnectionFactory connectionFactory = null;
|
| try {
| Context context = new InitialContext(properties);
| connectionFactory = (ConnectionFactory) context
| .lookup("ConnectionFactory");
| } catch (NamingException ne) {
| throw new RuntimeException(ne);
| }
|
| Connection connection = connectionFactory.createConnection();
| connection.start();
|
| // We want to have 5 threads listening for messages
| int numberOfProcesses = 5;
|
| MessageConsumer[] consumers = new MessageConsumer[numberOfProcesses];
| Session[] sessions = new Session[numberOfProcesses];
|
| while (true) {
| for (int j = 0; j < consumers.length; j++) {
| sessions[j] = connection.createSession(false,
| Session.AUTO_ACKNOWLEDGE);
| final Queue sourceQ = sessions[j].createQueue("publish.request");
| final Queue destQ = sessions[j]
| .createQueue("publish.request.error");
| final Session session = sessions[j];
|
| consumers[j] = sessions[j].createConsumer(sourceQ);
| MessageListener ml = new MessageListener() {
|
| public void onMessage(Message msg) {
| try {
| String payload = ((TextMessage) msg).getText();
| System.out.println(Thread.currentThread().getName()
| + " " + payload);
| MessageProducer producer = session
| .createProducer(destQ);
| producer.send(msg);
| try {
| // Simulate normal processing time
| Thread.currentThread().sleep(500);
| } catch (InterruptedException e) {
| e.printStackTrace();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| }
| };
|
| consumers[j].setMessageListener(ml);
| }
|
| // Recycle the sessions every 60 seconds
| Thread.sleep(60000);
| for (int j = 0; j < consumers.length; j++) {
| try {
| consumers[j].setMessageListener(null);
| consumers[j].close();
| } catch (Exception e) {
| // Ignore as we are recycling it anyway
| e.printStackTrace();
| }
| consumers[j] = null;
| try {
| sessions[j].close();
| } catch (Exception e) {
| // Ignore as we are recycling it anyway
| e.printStackTrace();
| }
| sessions[j] = null;
| }
| }
| }
| }
|
View the original post :
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3961768#...
Reply to the post :
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&a...