[jboss-user] [JBoss Messaging] - Re: Failover failed
chrismeadows
do-not-reply at jboss.com
Fri Dec 7 05:42:20 EST 2007
Hi Tim,
First of all, I have a large portion of humble pie to eat - I hadn't clustered the SLSB that publishes. However, I'm still seeing unreliable results, so I'll elaborate on what I am doing...
I have a pojo that connects via JNDI to queue/testQueue on the ClusteredConnectionFactory.
| private void startQueue() throws JMSException, NamingException {
| conn = ((ConnectionFactory) ctx.lookup("ClusteredConnectionFactory"))
| .createConnection("guest", "guest");
| queue = (Queue) ctx.lookup("queue/testQueue");
| session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
| conn.start();
| }
|
It sends 1000 messages (say) of size 1Kb, that has an integer property 'TopicID' that is a random number from 1 to 50, representing the index of the topic that the message will be forwarded to inside JBoss.
In my two node JBoss cluster, I have an EJB3 MDB that is listening to the queue configured thus:
| @Clustered
| @MessageDriven(
| messageListenerInterface=MessageListener.class,
| activationConfig = {
| @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue"),
| @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue")
| })
| public class OamServerPushImpl implements OamServerPush, MessageListener
|
This MDB simply forwards onto a SLSB configured as follows, which publishes the message to the topic indicated by the TopicID property :
| @Clustered
| @Stateless
| public class PublisherImpl implements Publisher {
| @Resource(mappedName="ClusteredConnectionFactory")
| ConnectionFactory topicConnectionFactory;
|
| @Resource
| private SessionContext sc;
|
| Topic topic;
|
| public void redispatch( Message m, String sReceiverAddress, int iMessageID) {
| Session session = null;
| TextMessage message = null;
|
| try {
| int iTopicID = m.getIntProperty("TopicID");
| String sPayload = m.getStringProperty("Payload");
| connection = topicConnectionFactory.createConnection();
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
| // connection.stop();
| connection.start();
|
| message = session.createTextMessage();
|
| message.setText("Redispatching : " + sPayload);
| message.setStringProperty("ReceivedBy", sReceiverAddress);
| message.setStringProperty("PublishedBy", InetAddress.getLocalHost().getHostAddress());
| message.setIntProperty("MessageID", iMessageID);
| sendToClients(session, message, iTopicID);
| connection.close();
| } catch (Throwable t) {
| // JMSException could be thrown
| logger.warn(
| "PublisherBean.redispatch: " + "Exception: "
| + t.toString());
| sc.setRollbackOnly();
| } finally {
| if (session != null) {
| try {
| session.close();
| } catch (JMSException e) {
| }
| }
| }
| }
|
| public void sendToClients(Session session, Message message, int iTopicID)
| throws JMSException {
| topic = (Topic) sc.lookup("topic/testDurableTopic" + iTopicID);
| MessageProducer publisher = session.createProducer(topic);
| publisher.send(message);
| publisher.close();
| logger.info("Publisher BEAN: Message published: (" +
| ((TextMessage)message).getText().length() +") bytes");
| }
|
I then have another 50 pojos outside the container each listening to one topic via a durable subscription
| public class MessageSubscriberClient implements Runnable {
| private static final Logger log = Logger.getLogger(MessageSubscriberClient.class);
|
| int iCount = 0;
| Connection conn;
| Session session;
| TopicSubscriber subscriber;
| Topic topic;
| Context ctx;
| int i;
| boolean bFinished = false;
|
| static int NUMBER_OF_MESSAGES = 1000;
|
| static final int NUMBER_OF_TOPICS = 50;
|
|
| public MessageSubscriberClient(Context ctx, int i) {
| this.i = i;
| this.ctx = ctx;
|
| try {
| startTopic();
| }
| catch (JMSException e1) {
| log.error(e1.getMessage(), e1);
| }
| catch (NamingException e) {
| log.error(e.getMessage(), e);
| }
|
| }
|
|
| public void run() {
| try {
| receive();
| } catch (JMSException e) {
| log.error(e.getMessage(), e);
| } catch (NamingException e) {
| log.error(e.getMessage(), e);
| }
| log.warn("Stopping topic on " + i);
| stopTopic();
| }
|
| public void receive() throws JMSException, NamingException {
| while ( ! bFinished) {
| try {
| if (subscriber != null) {
| process();
| }
| else {
| // try to recover
| log.info(i + " not well - subscriber null");
| // stopTopic();
| // startTopic();
| }
| } catch (JMSException e) {
| // try to recover
| // stopTopic();
| // startTopic();
| }
| }
| log.info("Finished receiving on " + i);
|
| }
|
|
| public int getNumReceivedMessages() {
| return iCount;
| }
|
| private void startTopic() throws JMSException, NamingException {
| conn = (Connection) ((ConnectionFactory) ctx
| .lookup("ClusteredConnectionFactory")).createConnection("john" + i, "needle");
|
| topic = (Topic) ctx.lookup("topic/testDurableTopic" + i);
| session = conn.createSession(false,
| TopicSession.AUTO_ACKNOWLEDGE);
|
| conn.stop();
|
| // subscription
| subscriber = session.createDurableSubscriber(topic, "finbar" + i);
|
| conn.start();
|
| }
|
| private void stopTopic() {
| log.info("In Stopping topic on " + i);
| try {
| if (subscriber != null) {
| subscriber.close();
| log.info("finbar" + i + " closed");
| }
| } catch (JMSException e) {
| log.error(e.getMessage(), e);
| }
| try {
| if (session != null) {
| session.unsubscribe("finbar" + i);
| log.info("finbar" + i + " unsubscribed");
| }
| } catch (JMSException e) {
| log.error(e.getMessage(), e);
| }
| try {
| if (conn != null)
| conn.close();
| } catch (JMSException e) {
| log.error(e.getMessage(), e);
| }
| }
|
| public void setFinished( boolean b) {
| bFinished = b;
| }
|
| public void process() throws JMSException {
| Message m = subscriber.receive(1000);
| TextMessage t = (TextMessage) m;
| if (t != null) {
| m.acknowledge();
| iCount++;
| }
| }
| }
|
In the receive method above, I've indicated where I've tried to detect receiver problems and recover by stopping and stating the topic.
I then run the receiver and sender code, wait until messages happily being sent and received, and then kill the JBoss node (kill -9) that is processing the messages.
With the receiver code as-is above (not trying to recover), the threads just sit in the while loop reporting
| 34864 [Thread-94] ERROR org.jboss.jms.client.container.ClosedInterceptor - ClosedInterceptor.ClientConsumerDelegate[e9-cs3ijw9f-1-64tmiw9f-c5iuxj-r55ss4]: method receive() did not go through, the interceptor is CLOSED
| javax.jms.IllegalStateException: The object is closed
| at org.jboss.jms.client.container.ClosedInterceptor.invoke(ClosedInterceptor.java:157)
| at org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:105)
| at org.jboss.jms.client.delegate.ClientConsumerDelegate$receive_N8299950230150603585.invokeNext(ClientConsumerDelegate$receive_N8299950230150603585.java)
| at org.jboss.jms.client.delegate.ClientConsumerDelegate.receive(ClientConsumerDelegate.java)
| at org.jboss.jms.client.JBossMessageConsumer.receive(JBossMessageConsumer.java:86)
| at com.ipaccess.MessageSubscriberClient.process(MessageSubscriberClient.java:154)
| at com.ipaccess.MessageSubscriberClient.receive(MessageSubscriberClient.java:78)
| at com.ipaccess.MessageSubscriberClient.run(MessageSubscriberClient.java:64)
| at java.lang.Thread.run(Thread.java:595)
|
If on the other hand I try to recover the topic by stopping and starting it, I get much 'better' results (I sometimes receive all messages), but more llikely I find that some messages get stuck in JBoss. The receivers will sit there listening whilst JBoss client side code reports the failover (with a lot of jboss remoting logging that I realise is an outstanding JIRA task)
| 294287 [Timer-0] WARN org.jboss.remoting.LeasePinger - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed to ping to server: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize
| =200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&
| dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOf
| CallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&
| socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
| 294584 [Thread-277] WARN org.jboss.remoting.LeasePinger - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed sending disconnect for client lease for client with session ID 4ss15g-cvnwhk-f9wjxoka-1-f9wjxqil-1t
|
and
| - unable to get secondary locator
| org.jboss.remoting.CannotConnectException: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.
| jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&
| numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_
| connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
| at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:532)
| at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:413)
| at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
| at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.getSecondaryLocator(BisocketClientInvoker.java:538)
| at org.jboss.remoting.transport.bisocket.BisocketServerInvoker.createControlConnection(BisocketServerInvoker.java:228)
| at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:402)
| at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
| at org.jboss.remoting.Client.invoke(Client.java:1634)
| at org.jboss.remoting.Client.addCallbackListener(Client.java:1703)
| at org.jboss.remoting.Client.addListener(Client.java:921)
| at org.jboss.jms.client.remoting.JMSRemotingConnection.addInvokerCallbackHandler(JMSRemotingConnection.java:237)
| at org.jboss.jms.client.remoting.JMSRemotingConnection.start(JMSRemotingConnection.java:312)
| at org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate.establishCallback(ClientClusteredConnectionFactoryDelegate.java:99)
| at org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler$CallbackConnectionListener.handleConnectionException(ConnectionFactoryCallbackHandler.java:105)
| at org.jboss.remoting.ConnectionValidator$1.run(ConnectionValidator.java:452)
| Caused by: java.net.ConnectException: Connection refused: connect
| at java.net.PlainSocketImpl.socketConnect(Native Method)
| at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333)
| at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:195)
| at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182)
| at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
| at java.net.Socket.connect(Socket.java:519)
| at org.jboss.remoting.transport.socket.SocketClientInvoker.createSocket(SocketClientInvoker.java:187)
| at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.createSocket(BisocketClientInvoker.java:420)
| at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.getConnection(MicroSocketClientInvoker.java:815)
| at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:525)
| ... 14 more
|
eventually (over an hour later) , most receivers reconnect, but some never do. I see
| 1562035 [Thread-313] ERROR org.jboss.remoting.MicroRemoteClientInvoker - error shutting down lease pinger
|
and then it all goes quiet - no more logging. Despite leaving the sender/receivers and one JBoss node up for well over and hour, no more messages get through.
I wasn't expecting to have to stop and start the topics on the receivers, but it looks like I have to. But even if I do, it is probable that some receivers do not failover.
Other info...
I execute sender and receivers from JUnit, but I think putting it in this post is unnecessary
Inside JBoss, the destinations are configured like so
|
| <mbean code="org.jboss.jms.server.destination.QueueService"
| name="jboss.messaging.destination:service=Queue,name=testQueue"
| xmbean-dd="xmdesc/Queue-xmbean.xml">
| <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
| <depends>jboss.messaging:service=PostOffice</depends>
| <attribute name="SecurityConfig">
| <attribute name="Clustered">true</attribute>
| <security>
| <role name="guest" read="true" write="true"/>
| <role name="publisher" read="true" write="true" create="false"/>
| <role name="noacc" read="false" write="false" create="false"/>
| </security>
| </attribute>
| </mbean>
|
| <!-- Repeat for testDurableTopic0 to testDurableTopic49 -->
| <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=testDurableTopic0"
| xmbean-dd="xmdesc/Topic-xmbean.xml">
| <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
| <depends>jboss.messaging:service=PostOffice</depends>
| <attribute name="Clustered">true</attribute>
| <attribute name="SecurityConfig">
| <security>
| <role name="guest" read="true" write="true"/>
| <role name="publisher" read="true" write="true" create="false"/>
| <role name="durpublisher" read="true" write="true" create="true"/>
| </security>
| </attribute>
| </mbean>
|
I have a MySQl-persistence-service.xml with the appropriate clustered attribute and SQL to set up the users, passwords, roles and ClientIDs
| <attribute name="Clustered">true</attribute>
| ...
| POPULATE.TABLES.69 = INSERT INTO JBM_USER (USER_ID,PASSWD,CLIENTID) VALUES ('john49', 'needle', 'DurableSubscriberExample49')
| POPULATE.TABLES.119 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('durpublisher','john49')
|
and I set ServerPeerID to 0 and 1 respectively in messaging-service.xml for the two nodes.
JBoss is running on RHEL 4 JRE 1.5.0_12
JBoss 4.2.2 GA with JBM 1.4.0.SP1 configured as per documentation.
Pojo sender and receiver are running on WinXP JRE 1.5.0_12.
View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4111165#4111165
Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4111165
More information about the jboss-user
mailing list