[jboss-user] [JBoss Messaging Users] - Re: message publishing to stopped topic

strzyga do-not-reply at jboss.com
Fri Nov 20 13:08:47 EST 2009


Thanks for the reply,
Here's the code for two MBeans - producer and consumer:

Producer:

  | package topictest;
  | 
  | import java.util.concurrent.atomic.AtomicLong;
  | 
  | import javax.annotation.Resource;
  | import javax.jms.JMSException;
  | import javax.jms.Session;
  | import javax.jms.TextMessage;
  | import javax.jms.Topic;
  | import javax.jms.TopicConnection;
  | import javax.jms.TopicConnectionFactory;
  | import javax.jms.TopicPublisher;
  | import javax.jms.TopicSession;
  | 
  | import org.apache.log4j.Logger;
  | import org.jboss.annotation.ejb.Service;
  | 
  | @Service(objectName = "test:service=Producer")
  | public class TestProducer implements TestProducerMBean {
  | 
  | 	@Resource(mappedName = "java:/JmsXA")
  | 	private TopicConnectionFactory tConnFactory;
  | 
  | 	private TopicSession tSession;
  | 	
  | 	private TopicConnection connection;
  | 
  | 	@Resource(mappedName = "/topic/test-Topic")
  | 	private Topic topic;
  | 
  | 	private TopicPublisher publisher;
  | 
  | 	private AtomicLong counter = new AtomicLong();
  | 
  | 	private static final Logger logger = Logger.getLogger(TestProducer.class);
  | 
  | 	@Override
  | 	public void send(Integer msgNum) throws JMSException {
  | 
  | 		for (int i = 0; (i < msgNum || -1 == msgNum); i++) {
  | 			String messageText = String.valueOf(counter.getAndIncrement());
  | 			TextMessage msg = tSession.createTextMessage(messageText);
  | 			publisher.publish(msg);
  | 			logger.info("Published " + messageText);
  | 
  | 			try {
  | 				Thread.sleep(500);
  | 			} catch (InterruptedException ignored) {
  | 			}
  | 		}
  | 	}
  | 
  | 	@Override
  | 	public void start() throws JMSException {
  | 		connection = tConnFactory.createTopicConnection();
  | 		tSession = connection.createTopicSession(false,
  | 				Session.AUTO_ACKNOWLEDGE);
  | 		publisher = tSession.createPublisher(topic);
  | 	}
  | 
  | 	@Override
  | 	public void stop() throws JMSException {
  | 		if (publisher != null) {
  | 			publisher.close();
  | 			tSession.close();
  | 			connection.close();
  | 		}
  | 	}
  | 
  | }
  | 

Management interface for Producer:

  | package topictest;
  | 
  | import javax.jms.JMSException;
  | 
  | import org.jboss.annotation.ejb.Management;
  | 
  | @Management
  | public interface TestProducerMBean {
  | 	void start() throws JMSException;
  | 	void stop() throws JMSException;
  | 	void send(Integer msgNum) throws JMSException;
  | }
  | 

Consumer:

  | package topictest;
  | 
  | import javax.annotation.Resource;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageListener;
  | import javax.jms.Session;
  | import javax.jms.TextMessage;
  | import javax.jms.Topic;
  | import javax.jms.TopicConnection;
  | import javax.jms.TopicConnectionFactory;
  | import javax.jms.TopicSession;
  | import javax.jms.TopicSubscriber;
  | 
  | import org.apache.log4j.Logger;
  | import org.jboss.annotation.ejb.Service;
  | 
  | @Service(objectName = "test:service=Consumer")
  | public class TestConsumer implements TestConsumerMBean, MessageListener{
  | 	
  | 	@Resource(mappedName = "java:/JmsXA")
  | 	private TopicConnectionFactory tConnFactory;
  | 	
  | 	@Resource(mappedName = "/topic/test-Topic")
  | 	private Topic topic;
  | 	
  | 	private TopicSession session;
  | 	private TopicConnection connection;
  | 	private TopicSubscriber subscriber;
  | 	
  | 	private static final Logger logger = Logger.getLogger(TestConsumer.class);
  | 	
  | 	@Override
  | 	public void create() throws JMSException {
  | 		connection = tConnFactory.createTopicConnection();
  | 		connection.setClientID("test-client");
  | 		session = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
  | 		subscriber = session.createDurableSubscriber(topic, "test-subscriber");
  | 		connection.start();
  | 	}
  | 
  | 	@Override
  | 	public void destroy() throws JMSException {
  | 		session.close();
  | 	}
  | 
  | 	@Override
  | 	public void start() throws JMSException {
  | 		startReceiving();
  | 	}
  | 	
  | 	@Override
  | 	public void startReceiving() throws JMSException {
  | 		subscriber.setMessageListener(this);
  | 	}
  | 
  | 	@Override
  | 	public void stopReceiving() throws JMSException {
  | 		subscriber.setMessageListener(null);
  | 	}
  | 
  | 	@Override
  | 	public void onMessage(Message arg0) {
  | 		TextMessage message = (TextMessage) arg0;
  | 		try {
  | 			logger.info("Received " + message.getText());
  | 		} catch (JMSException e) {
  | 			// TODO Auto-generated catch block
  | 			e.printStackTrace();
  | 		}
  | 	}
  | }
  | 

Management interface for Consumer:

  | package topictest;
  | 
  | import javax.jms.JMSException;
  | 
  | import org.jboss.annotation.ejb.Management;
  | 
  | @Management
  | public interface TestConsumerMBean {
  | 
  | 	void create() throws JMSException;
  | 	void destroy() throws JMSException;
  | 	void startReceiving() throws JMSException;
  | 	void stopReceiving() throws JMSException;
  | 	void start() throws JMSException;
  | }
  | 

Topic deployment descriptor that i'm using:

  | <?xml version="1.0" encoding="UTF-8"?>
  | <server>
  |         <mbean code="org.jboss.jms.server.destination.TopicService"
  |               name="test:service=Topic,name=test-Topic"
  |               xmbean-dd="xmdesc/Topic-xmbean.xml">
  |           <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  |           <depends>jboss.messaging:service=PostOffice</depends>
  |         </mbean>
  | </server>
  | 

And my JmsXA config:

  | <tx-connection-factory>
  |     <jndi-name>JmsXA</jndi-name>
  |     <xa-transaction/>
  |     <track-connection-by-tx/>
  |     <rar-name>jms-ra.rar</rar-name>
  |     <connection-definition>org.jboss.resource.adapter.jms.JmsConnectionFactory</connection-definition>
  |     <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
  |     <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
  | <config-property name="Strict" type="java.lang.Boolean">false</config-property>
  |     <max-pool-size>20</max-pool-size>
  |     <security-domain-and-application>JmsXARealm</security-domain-and-application>
  |   </tx-connection-factory>
  | 

How to reproduce:
1) find Producer in jmx-console and invoke it's send() operation, with argument specifying number of messages to publish; in jboss log you will see something like this:

  | 2009-11-20 18:57:03,498 INFO  [topictest.TestProducer] Published 0
  | 2009-11-20 18:57:03,499 INFO  [topictest.TestConsumer] Received 0
  | 2009-11-20 18:57:04,002 INFO  [topictest.TestProducer] Published 1
  | 2009-11-20 18:57:04,003 INFO  [topictest.TestConsumer] Received 1
  | 2009-11-20 18:57:04,504 INFO  [topictest.TestProducer] Published 2
  | 2009-11-20 18:57:04,505 INFO  [topictest.TestConsumer] Received 2
  | 
2) while Producer is publishing - go to test-Topic and invoke it's stop() operation; you will see something like this:

  | 2009-11-20 18:57:04,002 INFO  [topictest.TestProducer] Published 1
  | 2009-11-20 18:57:04,003 INFO  [topictest.TestConsumer] Received 1
  | 2009-11-20 18:57:04,504 INFO  [topictest.TestProducer] Published 2
  | 2009-11-20 18:57:04,505 INFO  [topictest.TestConsumer] Received 2
  | 2009-11-20 18:57:04,821 INFO  [org.jboss.jms.server.destination.TopicService] Topic[/topic/test-Topic] stopped
  | 2009-11-20 18:57:05,005 INFO  [topictest.TestProducer] Published 3
  | 2009-11-20 18:57:05,508 INFO  [topictest.TestProducer] Published 4
  | 2009-11-20 18:57:06,008 INFO  [topictest.TestProducer] Published 5
  | 
and there are no exceptions after "Topic stopped" log.
You can also stop consumer by invoking it's stopReceiving() operation and watch that, after stopping topic, messages are not persisted to DB. 

Does producer has to check in some way that topic is stopped and stop publishing?
Thanks

View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4266813#4266813

Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4266813



More information about the jboss-user mailing list