[jboss-user] [JBoss Messaging] - Client unable to reconnect to JBoss 1.0.1.SP4 after restart

bander do-not-reply at jboss.com
Wed Feb 21 02:39:43 EST 2007


Hello, were using JBoss 4.0.4GA with Messaging 1.0.1.SP4.

I've been testing a basic failure scenario where our application is forced to reconnect to JBoss Messaging after experiencing a JBoss connectivity problem.

In theory, when a JBoss connectivity problem is detected (via an ExceptionListener that we set on the jms Connection) we should be able to shut down all our message producers and consumers and recreate them using a new Connection - or at least keep trying to do so until a connection is obtained.

What I'm finding is that about half of the time our producers and consumers can successfully reconnect. Frequently however, they appear to become 'stuck' in the jboss client when attempting to tidy-up after the problem with the Connection is detected. By tidy-up I mean we attempt to close our producers/consumers, sessions and finally the connections themselves - before starting over and recreating everything (i.e. the connection, session etc).

I've written a test case to demonstrate this. What the test case does is spawn two threads. One thread is used for dispatching messages. This thread creates its own connection, session and message producer. The other thread is used for message listening. This thread also creates its own connection, session and message consumer (with message listener). The dispatcher sends messages to the same queue the listener has been configured on.

Before running the test case, configure the queue name to be used.
Note that the test case has a dependency on log4j and commons logging so you'll need those jars handy.
Start JBoss.
When you run the test case, the dispatcher will start sending messages which will be received by the listener.

You should see the following output being repeated over and over:


  | 17:39:36,187 INFO  @Dispatcher.1 [MultipleServerTest] Dispatcher.1 dispatched message
  | 17:39:36,187 INFO  @Thread-12 [MultipleServerTest] Listener.1 received message
  | 17:39:37,203 INFO  @Dispatcher.1 [MultipleServerTest] Dispatcher.1 dispatched message
  | 17:39:37,203 INFO  @Thread-12 [MultipleServerTest] Listener.1 received message
  | 

Stop and restart your JBoss server - what *should* happen each time is that dispatcher and listener recover and start sending/receiving messages again. What I'm frequently seeing is that either the dispatcher or listener or sometimes both do not recover. You may need to stop and start the server several times before the behaviour is observed.

The listener/dispatcher threads seem to be getting stuck here:


  | Thread [ListenerManager.1] (Suspended)
  | 	SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int) line: not available [native method]
  | 	SocketInputStream.read(byte[], int, int) line: 129
  | 	BufferedInputStream.fill() line: 183
  | 	BufferedInputStream.read() line: 201
  | 	DataInputStream(FilterInputStream).read() line: 66
  | 	JBossObjectInputStream.read() line: 193
  | 	SocketClientInvoker(MicroSocketClientInvoker).readVersion(InputStream) line: 902
  | 	SocketClientInvoker(MicroSocketClientInvoker).transport(String, Object, Map, Marshaller, UnMarshaller) line: 552
  | 	SocketClientInvoker(MicroRemoteClientInvoker).invoke(InvocationRequest) line: 122
  | 	Client.invoke(Object, Map, InvokerLocator) line: 1414
  | 	Client.invoke(Object, Map) line: 511
  | 	ClientConsumerDelegate(DelegateSupport).invoke(Invocation) line: 111
  | 	ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ConsumerAspect.handleClosing(Invocation) line: 108
  | 	ConsumerAspect25.invoke(Invocation) line: not available
  | 	ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ClosedInterceptor.invoke(Invocation) line: 182
  | 	PerInstanceInterceptor.invoke(Invocation) line: 117
  | 	ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ExceptionInterceptor.invoke(Invocation) line: 69
  | 	ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ClientLogInterceptor.invoke(Invocation) line: 107
  | 	ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ClientConsumerDelegate.closing() line: not available
  | 	JBossMessageConsumer.close() line: 96
  | 	MultipleServerTest$ListenerManagerThread.run() line: 190
  | 
	
and here:


  | Thread [Dispatcher.1] (Suspended)
  | 	SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int) line: not available [native method]
  | 	SocketInputStream.read(byte[], int, int) line: 129
  | 	BufferedInputStream.fill() line: 183
  | 	BufferedInputStream.read() line: 201
  | 	DataInputStream(FilterInputStream).read() line: 66
  | 	JBossObjectInputStream.read() line: 193
  | 	SocketClientInvoker(MicroSocketClientInvoker).readVersion(InputStream) line: 902
  | 	SocketClientInvoker(MicroSocketClientInvoker).transport(String, Object, Map, Marshaller, UnMarshaller) line: 552
  | 	SocketClientInvoker(MicroRemoteClientInvoker).invoke(InvocationRequest) line: 122
  | 	Client.invoke(Object, Map, InvokerLocator) line: 1414
  | 	Client.invoke(Object, Map) line: 511
  | 	ClientConnectionDelegate(DelegateSupport).invoke(Invocation) line: 111
  | 	ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ClosedInterceptor.invoke(Invocation) line: 182
  | 	PerInstanceInterceptor.invoke(Invocation) line: 117
  | 	ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ExceptionInterceptor.invoke(Invocation) line: 69
  | 	ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ClientLogInterceptor.invoke(Invocation) line: 107
  | 	ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
  | 	ClientConnectionDelegate.closing() line: not available
  | 	JBossConnection.close() line: 131
  | 	MultipleServerTest$DispatcherThread.run() line: 99
  | 

I also see errors in the JBoss log like:


  | jvm 1    | 14:58:07,906 ERROR [ServerThread] SocketServerInvoker[0.0.0.0:4489].invoke() call failed: Object with oid: -2147483646 was not found in the Dispatcher
  | jvm 1    | 14:58:07,906 ERROR [ServerThread] SocketServerInvoker[0.0.0.0:4489].invoke() call failed: Object with oid: -2147483642 was not found in the Dispatcher
  | jvm 1    | 14:58:07,921 ERROR [STDERR] java.lang.ClassCircularityError: org/jboss/messaging/core/plugin/IdBlock
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.jms.server.remoting.JMSWireFormat.write(JMSWireFormat.java:342)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.versionedWrite(ServerThread.java:778)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:585)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:363)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:159)
  | jvm 1    | 14:58:07,921 ERROR [STDERR] java.lang.ClassCircularityError: org/jboss/messaging/core/plugin/IdBlock
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.jms.server.remoting.JMSWireFormat.write(JMSWireFormat.java:342)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.versionedWrite(ServerThread.java:778)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:585)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:363)
  | jvm 1    | 14:58:07,921 ERROR [STDERR]  at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:159)	
  | 	

Anyway, here is my test case:


  | import java.util.Hashtable;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.ExceptionListener;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | 
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | 
  | public class RecoverTest {
  | 
  | 	class DispatcherThread extends Thread {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public DispatcherThread(ConnectionFactory connectionFactory,//
  | 				Queue queue, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageProducer producer = null;
  | 			ExceptionListener exceptionListener = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						exceptionListener = new ExceptionListener() {
  | 							public void onException(JMSException ex) {
  | 								LOG.error("Received connection exception", ex);
  | 								recycle = true;
  | 							}
  | 						};
  | 						connection.setExceptionListener(exceptionListener);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						producer = session.createProducer(queue);
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					if (producer != null) {
  | 						try {
  | 							producer.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during producer close",
  | 									ex);
  | 						}
  | 					}
  | 					if (session != null) {
  | 						try {
  | 							session.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during session close",
  | 									ex);
  | 						}
  | 					}
  | 					if (connection != null) {
  | 						try {
  | 							connection.close();
  | 						} catch (Exception ex) {
  | 							LOG.error(
  | 									"Caught exception during connection close",
  | 									ex);
  | 						}
  | 					}
  | 					producer = null;
  | 					session = null;
  | 					connection = null;
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				if (initialised && (!recycle) && (!shutdown)) {
  | 					try {
  | 						Thread.sleep(1000);
  | 						Message message = session
  | 								.createTextMessage("This is a test");
  | 						producer.send(message);
  | 						LOG.info(id + " dispatched message");
  | 					} catch (Exception ex) {
  | 						LOG.error("Caught exception during send", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	class ListenerManagerThread extends Thread implements ExceptionListener {
  | 		private ConnectionFactory connectionFactory;
  | 
  | 		private String id;
  | 
  | 		private boolean initialised = false;
  | 
  | 		private MessageListener messageListener;
  | 
  | 		private Queue queue;
  | 
  | 		private boolean recycle = false;
  | 
  | 		private boolean shutdown = false;
  | 
  | 		public ListenerManagerThread(ConnectionFactory connectionFactory,
  | 				Queue queue, MessageListener messageListener, String id) {
  | 			super();
  | 			this.connectionFactory = connectionFactory;
  | 			this.queue = queue;
  | 			this.messageListener = messageListener;
  | 			this.id = id;
  | 			this.setName(id);
  | 		}
  | 
  | 		private boolean isRecycle() {
  | 			return recycle;
  | 		}
  | 
  | 		public void onException(JMSException ex) {
  | 			LOG.error("Received connection exception", ex);
  | 			recycle = true;
  | 		}
  | 
  | 		public void run() {
  | 			Connection connection = null;
  | 			Session session = null;
  | 			MessageConsumer consumer = null;
  | 
  | 			while (!shutdown) {
  | 				if (!initialised) {
  | 					try {
  | 						connection = connectionFactory.createConnection();
  | 						connection.setExceptionListener(this);
  | 						session = connection.createSession(false,
  | 								Session.AUTO_ACKNOWLEDGE);
  | 						consumer = session.createConsumer(queue);
  | 						consumer.setMessageListener(messageListener);
  | 						connection.start();
  | 						LOG.info(id + " initialised");
  | 						initialised = true;
  | 					} catch (JMSException ex) {
  | 						LOG.error("Caught exception during initialisation", ex);
  | 						recycle = true;
  | 					}
  | 				}
  | 				if (isRecycle()) {
  | 					if (consumer != null) {
  | 						try {
  | 							consumer.setMessageListener(null);
  | 							consumer.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during consumer close",
  | 									ex);
  | 						}
  | 					}
  | 					if (session != null) {
  | 						try {
  | 							session.close();
  | 						} catch (Exception ex) {
  | 							LOG.error("Caught exception during session close",
  | 									ex);
  | 						}
  | 					}
  | 					if (connection != null) {
  | 						try {
  | 							connection.close();
  | 						} catch (Exception ex) {
  | 							LOG.error(
  | 									"Caught exception during connection close",
  | 									ex);
  | 						}
  | 					}
  | 					consumer = null;
  | 					session = null;
  | 					connection = null;
  | 					initialised = false;
  | 					recycle = false;
  | 				}
  | 				try {
  | 					Thread.sleep(1000);
  | 				} catch (InterruptedException ex) {
  | 					LOG.error("Caught exception during sleep");
  | 				}
  | 			}
  | 		}
  | 
  | 		public void shutdown() {
  | 			LOG.info(id + " is shutting down");
  | 			recycle = true;
  | 			shutdown = true;
  | 		}
  | 	}
  | 
  | 	class SimpleListener implements MessageListener {
  | 
  | 		private String id;
  | 
  | 		public SimpleListener(String id) {
  | 			super();
  | 			this.id = id;
  | 		}
  | 
  | 		/**
  | 		 * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
  | 		 */
  | 		public void onMessage(Message message) {
  | 			LOG.info(id + " received message");
  | 		}
  | 
  | 	}
  | 
  | 	private static final Log LOG = LogFactory.getLog(RecoverTest.class);
  | 
  | 	public static void main(String[] args) {
  | 		RecoverTest test = new RecoverTest();
  | 
  | 		try {
  | 			test.start();
  | 		} catch (Throwable ex) {
  | 			LOG.error("Caught exception in main", ex);
  | 		}
  | 	}
  | 
  | 	private void start() throws Exception {
  | 		// Setup connection 1
  | 		Hashtable properties1 = new Hashtable();
  | 		properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  | 				"org.jnp.interfaces.NamingContextFactory");
  | 		properties1.put(Context.URL_PKG_PREFIXES,
  | 				"org.jboss.naming:org.jnp.interfaces");
  | 		properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  | 		properties1.put(Context.SECURITY_PRINCIPAL, "admin");
  | 		properties1.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  | 		ConnectionFactory connectionFactory1 = null;
  | 		Queue queue1 = null;
  | 		Context context1 = null;
  | 
  | 		context1 = new InitialContext(properties1);
  | 		connectionFactory1 = (ConnectionFactory) context1
  | 				.lookup("ConnectionFactory");
  | 		queue1 = (Queue) context1.lookup("/queue/testQueue");
  | 
  | 		MessageListener listener1 = new SimpleListener("Listener.1");
  | 		ListenerManagerThread manager1 = new ListenerManagerThread(
  | 				connectionFactory1, queue1, listener1, "ListenerManager.1");
  | 		manager1.start();
  | 
  | 		DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
  | 				queue1, "Dispatcher.1");
  | 		dispatcher1.start();
  | 
  | 		Thread.sleep(600000);
  | 
  | 		manager1.shutdown();
  | 		manager1.join();
  | 		dispatcher1.shutdown();
  | 		dispatcher1.join();
  | 		context1.close();
  | 	}
  | }
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4019739#4019739

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4019739



More information about the jboss-user mailing list