[jboss-user] [JBoss Messaging] - Re: Removing a MessageListener doesn't remove its associated

davidrh do-not-reply at jboss.com
Wed Aug 2 03:04:48 EDT 2006


I have enhanced my test harness again and have managed to reproduce the problem of the listeners dying.  I have made the harness emulate pretty much exactly what we do in our application for receiving messages.  Our application receives messages from multiple queues, and we have a manager thread for each queue which manages the process of connecting to the queue and setting up the right number of message listeners.  In my example, I have set it up so that the first queue moves its messages to the second queue, the second queue to the third queue and the third queue moves them back to the first queue.  In this way, it generates a constant flow of messages for the test.

When I ran this, after an hour and a half I get the following output in the console:


  | Checking core.internal.bulkUpload
  | Checking core.internal.bulkUpload.error
  | Checking billing.response
  | Checking core.internal.publication
  | Checking publish.request
  | Checking billing.response.error
  | Checking core.track.error
  | Checking core.track
  | ********** core.track.2 has stopped **********
  | ********** core.track.3 has stopped **********
  | ********** core.track.4 has stopped **********
  | 

After four hours, some more stop:


  | Checking core.internal.bulkUpload
  | Checking billing.response
  | Checking core.track
  | ********** core.track.2 has stopped **********
  | ********** core.track.3 has stopped **********
  | ********** core.track.4 has stopped **********
  | Checking billing.response.error
  | Checking core.internal.publication
  | ********** core.internal.publication.4 has stopped **********
  | ********** core.internal.publication.6 has stopped **********
  | ********** core.internal.publication.7 has stopped **********
  | ********** core.internal.publication.8 has stopped **********
  | ********** core.internal.publication.9 has stopped **********
  | Checking core.track.error
  | Checking publish.request
  | 

If I go into the debugger in Eclipse and pause these threads, I get the following stack trace:


  | Thread [core.track.4] (Suspended)
  | 	Object.wait(long) line: not available [native method]
  | 	Object.wait() line: 429
  | 	LinkedQueue.take() line: not available
  | 	QueuedExecutor$RunLoop.run() line: not available
  | 	Thread.run() line: 534
  | 

I've checked in the JMX console, and the core.track queue has over 9,000 messages waiting to be processed, so there is plenty for these threads to do. The core.internal.publication queue had 280 messages, so there still should be work for the listeners that have stopped.

The new test harness is below.  I've created the same number of listeners for all of our queues as we have in our application.  These queues are configured in JBoss as follows:

publish.request FullSize=10000  PageSize=2000  DownCacheSize=2000
core.track FullSize=10000  PageSize=2000  DownCacheSize=2000
core.internal.publication FullSize=10000  PageSize=2000  DownCacheSize=2000
core.track.error FullSize=10000  PageSize=2000  DownCacheSize=2000
billing.response FullSize=10000  PageSize=2000  DownCacheSize=2000
billing.response.error FullSize=2000  PageSize=200  DownCacheSize=200
core.internal.bulkUpload FullSize=100  PageSize=10  DownCacheSize=10
core.internal.bulkUpload.error FullSize=100  PageSize=10  DownCacheSize=10

A template of our queue configuration is as follows:


  |    <mbean code="org.jboss.jms.server.destination.Queue"
  |       name="jboss.messaging.destination:service=Queue,name=core.track"
  |       xmbean-dd="xmdesc/Queue-xmbean.xml">
  |       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  |       <attribute name="SecurityConfig">
  |          <security>
  |             <role name="guest" read="true" write="true"/>
  |             <role name="publisher" read="true" write="true" create="false"/>
  |             <role name="noacc" read="false" write="false" create="false"/>
  |          </security>
  |       </attribute>
  |       <attribute name="FullSize">10000</attribute>
  |       <attribute name="PageSize">2000</attribute>
  |       <attribute name="DownCacheSize">2000</attribute>
  |    </mbean>
  | 

To run the harness, I start with a new instance of JBoss Messaging and seed the publish.request queue with lots (10,000+) of text messages.  I then start the harness which simply moves the messages between the publish.request, core.internal.publication and core.track queues.  I don't put any messages in the other queues, as they don't have messages in them when our application fails.  I then let the harness run, until it displays the queue has stopped messages.


  | public class TestMultiSessionMessageListener {
  | 
  |   public static void main(String[] args) {
  |     TestMultiSessionMessageListener ml = new TestMultiSessionMessageListener();
  | 
  |     try {
  |       ml.test();
  |     } catch (Exception e) {
  |       e.printStackTrace();
  |     }
  |   }
  | 
  |   private void test() throws Exception {
  |     boolean printMessages = false;
  |     
  |     createListeners("publish.request", "core.track", 5, 250, printMessages);
  |     Thread.sleep(2000);
  |     createListeners("core.track", "core.internal.publication", 5, 250, printMessages);
  |     Thread.sleep(2000);
  |     createListeners("core.internal.publication", "publish.request", 10, 750, printMessages);
  |     Thread.sleep(2000);
  |     createListeners("core.track.error", "core.track", 2, 100, printMessages);
  |     Thread.sleep(2000);
  |     createListeners("billing.response", "publish.request", 2, 100, printMessages);
  |     Thread.sleep(2000);
  |     createListeners("billing.response.error", "billing.response", 2, 100, printMessages);
  |     Thread.sleep(2000);
  |     createListeners("core.internal.bulkUpload", "publish.request", 3, 100, printMessages);
  |     Thread.sleep(2000);
  |     createListeners("core.internal.bulkUpload.error",
  |         "core.internal.bulkUpload", 1, 100, printMessages);
  |     Thread.sleep(Long.MAX_VALUE);
  |   }
  | 
  |   private void createListeners(final String receiveQueue,
  |       final String sendQueue, final int numberOfProcesses,
  |       final long delay, final boolean printMessages) throws Exception {
  | 
  |     Runnable runnable = new Runnable() {
  | 
  |       public void run() {
  |         Hashtable properties = new Hashtable();
  |         properties.put(Context.INITIAL_CONTEXT_FACTORY,
  |             "org.jnp.interfaces.NamingContextFactory");
  |         properties.put(Context.URL_PKG_PREFIXES,
  |             "org.jboss.naming:org.jnp.interfaces");
  |         properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  |         properties.put(Context.SECURITY_PRINCIPAL, "admin");
  |         properties.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  |         ConnectionFactory connectionFactory = null;
  | 
  |         try {
  |           Context context = new InitialContext(properties);
  |           connectionFactory = (ConnectionFactory) context
  |               .lookup("ConnectionFactory");
  |           Connection connection = connectionFactory
  |               .createConnection();
  |           connection.start();
  | 
  |           MessageConsumer[] consumers = new MessageConsumer[numberOfProcesses];
  |           Session[] sessions = new Session[numberOfProcesses];
  |           MessageListener[] listeners = new MessageListener[numberOfProcesses];
  |           final long[] lastReceived = new long[numberOfProcesses];
  | 
  |           for (int j = 0; j < consumers.length; j++) {
  |             sessions[j] = connection.createSession(false,
  |                 Session.AUTO_ACKNOWLEDGE);
  |             final Queue sourceQ = sessions[j]
  |                 .createQueue(receiveQueue);
  |             final Queue destQ = sessions[j].createQueue(sendQueue);
  |             final Session session = sessions[j];
  | 
  |             consumers[j] = sessions[j].createConsumer(sourceQ);
  |             lastReceived[j] = 0;
  |             final int count = j;
  | 
  |             listeners[j] = new MessageListener() {
  |               public String threadName;
  | 
  |               private boolean threadNameChanged = false;
  | 
  |               public void onMessage(Message msg) {
  |                 lastReceived[count] = System
  |                     .currentTimeMillis();
  |                 try {
  |                   if (!threadNameChanged) {
  |                     threadName = receiveQueue + "." + count;
  |                     Thread.currentThread().setName(
  |                         threadName);
  |                     threadNameChanged = true;
  |                   }
  |                   String payload = ((TextMessage) msg)
  |                       .getText();
  |                   if (printMessages) {
  |                     System.out.println(Thread
  |                         .currentThread().getName()
  |                         + " " + payload);
  |                   }
  |                   MessageProducer producer = session
  |                       .createProducer(destQ);
  |                   try {
  |                     // Simulate normal processing time
  |                     Thread.currentThread().sleep(delay);
  |                   } catch (InterruptedException e) {
  |                     e.printStackTrace();
  |                   }
  |                   producer.send(msg);
  |                   producer.close();
  |                 } catch (JMSException e) {
  |                   e.printStackTrace();
  |                 }
  |               }
  |             };
  | 
  |             consumers[j].setMessageListener(listeners[j]);
  |           }
  |           // Thread.currentThread().sleep(Long.MAX_VALUE);
  |           while (true) {
  |             Thread.currentThread().sleep(60000);
  |             System.out.println("Checking " + receiveQueue);
  |             long currentTime = System.currentTimeMillis();
  |             for (int j = 0; j < numberOfProcesses; j++) {
  |               if (lastReceived[j] > 0 && currentTime - lastReceived[j] > 60000) {
  |                 System.out.println("********** " + receiveQueue + "."
  |                     + j + " has stopped **********");
  |               }
  |             }
  |           }
  |         } catch (Exception e) {
  |           throw new RuntimeException(e);
  |         }
  |       }
  |     };
  | 
  |     Thread t = new Thread(runnable);
  |     t.setName("Manager " + receiveQueue);
  |     t.start();
  |   }
  | }
  | 


View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3962366#3962366

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3962366



More information about the jboss-user mailing list