[jboss-user] [JBoss Messaging] - Re: Failover failed

chrismeadows do-not-reply at jboss.com
Fri Dec 7 05:42:20 EST 2007


Hi Tim,

First of all, I have a large portion of humble pie to eat - I hadn't clustered the SLSB that publishes. However, I'm still seeing unreliable results, so I'll elaborate on what I am doing...

I have a pojo that connects via JNDI to queue/testQueue on the ClusteredConnectionFactory. 


  | 	private void startQueue() throws JMSException, NamingException {
  | 		conn = ((ConnectionFactory) ctx.lookup("ClusteredConnectionFactory"))
  | 				.createConnection("guest", "guest");
  | 		queue = (Queue) ctx.lookup("queue/testQueue");
  | 		session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | 
  | 		conn.start();
  | 	}
  | 
It sends 1000 messages (say) of size 1Kb, that has an integer property 'TopicID' that is a random number from 1 to 50, representing the index of the topic that the message will be forwarded to inside JBoss.

In my two node JBoss cluster, I have an EJB3 MDB that is listening to the queue configured thus:


  | @Clustered
  | @MessageDriven(
  |     messageListenerInterface=MessageListener.class,
  | activationConfig = {
  |     @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue"),
  |     @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue")
  | })
  | public class OamServerPushImpl implements OamServerPush, MessageListener
  | 

This MDB simply forwards onto a SLSB configured as follows, which publishes the message to the topic indicated by the TopicID property :


  | @Clustered
  | @Stateless
  | public class PublisherImpl implements Publisher  {
  |     @Resource(mappedName="ClusteredConnectionFactory")
  |     ConnectionFactory topicConnectionFactory;
  | 
  |     @Resource
  |     private SessionContext sc;
  | 
  |     Topic topic;
  | 
  |     public void redispatch( Message m, String sReceiverAddress, int iMessageID) {
  |         Session session = null;
  |         TextMessage message = null;
  | 
  |         try {
  |         	int iTopicID = m.getIntProperty("TopicID");
  |         	String sPayload = m.getStringProperty("Payload");
  | 	            connection = topicConnectionFactory.createConnection();
  | 	            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | //	            connection.stop();
  | 	            connection.start();
  | 
  | 	            message = session.createTextMessage();
  | 
  | 	            message.setText("Redispatching : " + sPayload);
  | 	            message.setStringProperty("ReceivedBy", sReceiverAddress);
  | 	            message.setStringProperty("PublishedBy", InetAddress.getLocalHost().getHostAddress());
  | 	            message.setIntProperty("MessageID", iMessageID);
  |             	sendToClients(session, message, iTopicID);
  |             	connection.close();
  |         } catch (Throwable t) {
  |             // JMSException could be thrown
  |             logger.warn(
  |                     "PublisherBean.redispatch: " + "Exception: "
  |                     + t.toString());
  |             sc.setRollbackOnly();
  |         } finally {
  |             if (session != null) {
  |                 try {
  |                     session.close();
  |                 } catch (JMSException e) {
  |                 }
  |             }
  |         }
  |     }
  | 
  | 	public void sendToClients(Session session, Message message, int iTopicID)
  | 			throws JMSException {
  | 		topic = (Topic) sc.lookup("topic/testDurableTopic" + iTopicID);
  |         MessageProducer publisher = session.createProducer(topic);
  | 		publisher.send(message);
  | 		publisher.close();
  |         logger.info("Publisher BEAN: Message published: (" +
  |                 ((TextMessage)message).getText().length() +") bytes");
  | 	}
  | 


I then have another 50 pojos outside the container each listening to one topic via a durable subscription


  | public class MessageSubscriberClient implements Runnable {
  | 	private static final Logger log = Logger.getLogger(MessageSubscriberClient.class);
  | 
  | 	int iCount = 0;
  | 	Connection conn;
  | 	Session session;
  | 	TopicSubscriber subscriber;
  | 	Topic topic;
  | 	Context ctx;
  | 	int i;
  | 	boolean bFinished = false;
  | 
  | 	static int NUMBER_OF_MESSAGES = 1000;
  | 
  | 	static final int NUMBER_OF_TOPICS = 50;
  | 
  | 
  | 	public MessageSubscriberClient(Context ctx, int i) {
  | 		this.i = i;
  | 		this.ctx = ctx;
  | 
  | 		try {
  | 			startTopic();
  | 		}
  | 		catch (JMSException e1) {
  | 			log.error(e1.getMessage(), e1);
  | 		}
  | 		catch (NamingException e) {
  | 			log.error(e.getMessage(), e);
  | 		}
  | 
  | 	}
  | 
  | 
  | 	public void run() {
  | 		try {
  | 			receive();
  | 		} catch (JMSException e) {
  | 			log.error(e.getMessage(), e);
  | 		} catch (NamingException e) {
  | 			log.error(e.getMessage(), e);
  | 		}
  | 		log.warn("Stopping topic on " + i);
  | 		stopTopic();
  | 	}
  | 
  | 	public void receive() throws JMSException, NamingException {
  | 		while ( ! bFinished) {
  | 			try {
  | 				if (subscriber != null) {
  | 					process();
  | 				}
  | 				else {
  |                                         // try to recover
  | 					log.info(i + " not well - subscriber null");
  | //					stopTopic();
  | //					startTopic();
  | 				}
  | 			} catch (JMSException e) {
  |                             // try to recover
  | //			   stopTopic();
  | //			   startTopic();
  | 			}
  | 		}
  | 		log.info("Finished receiving on " + i);
  | 
  | 	}
  | 
  | 
  | 	public int getNumReceivedMessages() {
  | 		return iCount;
  | 	}
  | 
  | 	private void startTopic() throws JMSException, NamingException {
  | 		conn = (Connection) ((ConnectionFactory) ctx
  | 				.lookup("ClusteredConnectionFactory")).createConnection("john" + i, "needle");
  | 
  | 		topic = (Topic) ctx.lookup("topic/testDurableTopic" + i);
  | 		session = conn.createSession(false,
  | 				TopicSession.AUTO_ACKNOWLEDGE);
  | 
  | 		conn.stop();
  | 
  | 		// subscription
  | 		subscriber = session.createDurableSubscriber(topic, "finbar" + i);
  | 
  | 		conn.start();
  | 
  | 	}
  | 
  | 	private void stopTopic() {
  | 		log.info("In Stopping topic on " + i);
  | 		try {
  | 			if (subscriber != null) {
  | 				subscriber.close();
  | 				log.info("finbar" + i + " closed");
  | 			}
  | 		} catch (JMSException e) {
  | 			log.error(e.getMessage(), e);
  | 		}
  | 		try {
  | 			if (session != null) {
  | 				session.unsubscribe("finbar" + i);
  | 				log.info("finbar" + i + " unsubscribed");
  | 			}
  | 		} catch (JMSException e) {
  | 			log.error(e.getMessage(), e);
  | 		}
  | 		try {
  | 			if (conn != null)
  | 				conn.close();
  | 		} catch (JMSException e) {
  | 			log.error(e.getMessage(), e);
  | 		}
  | 	}
  | 
  | 	public void setFinished( boolean b) {
  | 		bFinished = b;
  | 	}
  | 
  | 	public void process() throws JMSException {
  | 		Message m = subscriber.receive(1000);
  | 		TextMessage t = (TextMessage) m;
  | 		if (t != null) {
  | 			m.acknowledge();
  | 			iCount++;
  | 		}
  | 	}
  | }
  | 

In the receive method above, I've indicated where I've tried to detect receiver problems and recover by stopping and stating the topic.

I then run the receiver and sender code, wait until messages happily being sent and received, and then kill the JBoss node (kill -9) that is processing the messages. 

With the receiver code as-is above (not trying to recover), the threads just sit in the while loop reporting


  | 34864 [Thread-94] ERROR org.jboss.jms.client.container.ClosedInterceptor  - ClosedInterceptor.ClientConsumerDelegate[e9-cs3ijw9f-1-64tmiw9f-c5iuxj-r55ss4]: method receive() did not go through, the interceptor is CLOSED
  | javax.jms.IllegalStateException: The object is closed
  | 	at org.jboss.jms.client.container.ClosedInterceptor.invoke(ClosedInterceptor.java:157)
  | 	at org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:105)
  | 	at org.jboss.jms.client.delegate.ClientConsumerDelegate$receive_N8299950230150603585.invokeNext(ClientConsumerDelegate$receive_N8299950230150603585.java)
  | 	at org.jboss.jms.client.delegate.ClientConsumerDelegate.receive(ClientConsumerDelegate.java)
  | 	at org.jboss.jms.client.JBossMessageConsumer.receive(JBossMessageConsumer.java:86)
  | 	at com.ipaccess.MessageSubscriberClient.process(MessageSubscriberClient.java:154)
  | 	at com.ipaccess.MessageSubscriberClient.receive(MessageSubscriberClient.java:78)
  | 	at com.ipaccess.MessageSubscriberClient.run(MessageSubscriberClient.java:64)
  | 	at java.lang.Thread.run(Thread.java:595)
  | 

If on the other hand I try to recover the topic by stopping and starting it, I get much 'better' results (I sometimes receive all messages), but more llikely I find that some messages get stuck in JBoss. The receivers will sit there listening whilst JBoss client side code reports the failover (with a lot of jboss remoting logging that I realise is an outstanding JIRA task)


  | 294287 [Timer-0] WARN org.jboss.remoting.LeasePinger  - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed to ping to server: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize
  | =200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&
  | dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOf
  | CallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&
  | socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
  | 294584 [Thread-277] WARN org.jboss.remoting.LeasePinger  - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed sending disconnect for client lease for client with session ID 4ss15g-cvnwhk-f9wjxoka-1-f9wjxqil-1t
  | 

and 


  | - unable to get secondary locator
  | org.jboss.remoting.CannotConnectException: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.
  | jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&
  | numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_
  | connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
  | 	at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:532)
  | 	at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:413)
  | 	at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
  | 	at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.getSecondaryLocator(BisocketClientInvoker.java:538)
  | 	at org.jboss.remoting.transport.bisocket.BisocketServerInvoker.createControlConnection(BisocketServerInvoker.java:228)
  | 	at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:402)
  | 	at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
  | 	at org.jboss.remoting.Client.invoke(Client.java:1634)
  | 	at org.jboss.remoting.Client.addCallbackListener(Client.java:1703)
  | 	at org.jboss.remoting.Client.addListener(Client.java:921)
  | 	at org.jboss.jms.client.remoting.JMSRemotingConnection.addInvokerCallbackHandler(JMSRemotingConnection.java:237)
  | 	at org.jboss.jms.client.remoting.JMSRemotingConnection.start(JMSRemotingConnection.java:312)
  | 	at org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate.establishCallback(ClientClusteredConnectionFactoryDelegate.java:99)
  | 	at org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler$CallbackConnectionListener.handleConnectionException(ConnectionFactoryCallbackHandler.java:105)
  | 	at org.jboss.remoting.ConnectionValidator$1.run(ConnectionValidator.java:452)
  | Caused by: java.net.ConnectException: Connection refused: connect
  | 	at java.net.PlainSocketImpl.socketConnect(Native Method)
  | 	at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333)
  | 	at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:195)
  | 	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182)
  | 	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
  | 	at java.net.Socket.connect(Socket.java:519)
  | 	at org.jboss.remoting.transport.socket.SocketClientInvoker.createSocket(SocketClientInvoker.java:187)
  | 	at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.createSocket(BisocketClientInvoker.java:420)
  | 	at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.getConnection(MicroSocketClientInvoker.java:815)
  | 	at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:525)
  | 	... 14 more
  | 

eventually (over an hour later) , most receivers reconnect, but some never do. I see


  | 1562035 [Thread-313] ERROR org.jboss.remoting.MicroRemoteClientInvoker  - error shutting down lease pinger
  | 

and then it all goes quiet - no more logging. Despite leaving the sender/receivers and one JBoss node up for well over and hour, no more messages get through.

I wasn't expecting to have to stop and start the topics on the receivers, but it looks like I have to. But even if I do, it is probable that some receivers do not failover.


Other info...

I execute sender and receivers from JUnit, but I think putting it in this post is unnecessary

Inside JBoss, the destinations are configured like so


  | 
  |    <mbean code="org.jboss.jms.server.destination.QueueService"
  |       name="jboss.messaging.destination:service=Queue,name=testQueue"
  |       xmbean-dd="xmdesc/Queue-xmbean.xml">
  |       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  |       <depends>jboss.messaging:service=PostOffice</depends>
  |       <attribute name="SecurityConfig">
  |       <attribute name="Clustered">true</attribute>
  |          <security>
  |             <role name="guest" read="true" write="true"/>
  |             <role name="publisher" read="true" write="true" create="false"/>
  |             <role name="noacc" read="false" write="false" create="false"/>
  |          </security>
  |       </attribute>
  |    </mbean>
  | 
  |   <!-- Repeat for testDurableTopic0 to testDurableTopic49 -->
  |   <mbean code="org.jboss.jms.server.destination.TopicService"	name="jboss.messaging.destination:service=Topic,name=testDurableTopic0"
  |       xmbean-dd="xmdesc/Topic-xmbean.xml">
  |       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  |       <depends>jboss.messaging:service=PostOffice</depends>
  |       <attribute name="Clustered">true</attribute>
  |     <attribute name="SecurityConfig">
  |        <security>
  |          <role name="guest" read="true" write="true"/>
  |          <role name="publisher" read="true" write="true" create="false"/>
  |          <role name="durpublisher" read="true" write="true" create="true"/>
  |       </security>
  |     </attribute>
  |   </mbean>
  | 

I have a MySQl-persistence-service.xml with the appropriate clustered attribute and SQL to set up the users, passwords, roles and ClientIDs 


  | <attribute name="Clustered">true</attribute>
  | ...
  | POPULATE.TABLES.69 = INSERT INTO JBM_USER (USER_ID,PASSWD,CLIENTID) VALUES ('john49', 'needle', 'DurableSubscriberExample49')
  | POPULATE.TABLES.119 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('durpublisher','john49')
  | 



and I set ServerPeerID to 0 and 1 respectively in messaging-service.xml for the two nodes.

JBoss is running on RHEL 4 JRE 1.5.0_12
JBoss 4.2.2 GA with JBM 1.4.0.SP1 configured as per documentation.
Pojo sender and receiver are running on WinXP JRE 1.5.0_12.



View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4111165#4111165

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4111165



More information about the jboss-user mailing list