[jboss-user] [JBoss Messaging] - Re: LocalTx, ClientTransaction leak?

ydzsidemiik do-not-reply at jboss.com
Wed Jan 9 08:50:03 EST 2008


Our application is store-and-forward middleware for a telecom messaging protocol. We use JBAS and JBM built off the 'all' JBAS configuration, with Apache MINA, SLF4J, Commons Lang, and the Postgres driver being included in the JBAS configuration's lib directory. We do not use Spring, nor any other third party libraries or frameworks besides these.

The application is composed of two parts; a JCA resource adapter which acts as a protocol driver for this messaging protocol and an EJB3 component which contains the routing logic. The EJB3 component contains both custom MDBs, driven by our resource adapter, and JMS MDBs. The custom MDBs act as listeners and enqueue messages that they receive into JBM queues; the JMS MDBs act as senders, listening to those queues and calling upon the resource adapter again to transmit messages to various external endpoints.

It is true that we open a lot of sessions; because the protocol is not transacted, each message is logically independent of any others. As a result, we enqueue each message using a closure (which you can see listed below) which obtains a connection and a session, then carefully releases both.

Here are some of the traces that seem typical. Again, I do not know why they appear to be cut off. It may be a misunderstanding on my part or a quirk of my profiler.

This one seems to be an invocation of one of the custom MDBs by the RA.
anonymous wrote : 
  | org.jboss.resource.connectionmanager.xa.JcaXAResourceWrapper.end(Xid, int)
  | com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.delistResource(XAResource, int)
  | org.jboss.resource.adapter.jms.JmsSession.close()
  | com.acme.gateway.util.jms.JmsOperation.execute()
  | com.acme.gateway.EnqueuerImpl.enqueue(EnqueuerImpl$QueueName, RoutableRequest, RoutableRequest, int, EnqueuerImpl$Header[])
  | com.acme.gateway.EnqueuerImpl.enqueueOutboundLive(RoutableRequest, Bind)
  | $Proxy164.enqueueOutboundLive(RoutableRequest, Bind)
  | com.acme.gateway.ClientGateway.onRoutable(RoutableRequest)
  | com.acme.gateway.smppra.adapter.PDUListenerCoalescingAdapter.onSubmit(SubmitSM)
  | java.lang.reflect.Method.invoke(Object, Object[])
  | com.acme.gateway.smppra.spi.ClientManagedConnection.dispatch(PDU, MessageEndpoint)
  | com.acme.gateway.smppra.spi.BaseManagedConnection.messageReceived(Object)
  | org.apache.mina.filter.executor.ExecutorFilter$ProcessEventsRunnable.run()
  | com.acme.gateway.smppra.spi.SmppResourceAdapter$3$1.run()
  | org.jboss.util.threadpool.BasicTaskWrapper.run()
  | EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run()
  | java.lang.Thread.run()
  | 

Don't know what this one is.
anonymous wrote : 
  | org.jboss.resource.connectionmanager.xa.JcaXAResourceWrapper.end(Xid, int)
  | com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelPrepare()
  | com.arjuna.ats.arjuna.coordinator.BasicAction.doPrepare(boolean, AbstractRecord)
  | com.arjuna.ats.arjuna.coordinator.BasicAction.doPrepare(boolean)
  | com.arjuna.ats.arjuna.coordinator.BasicAction.prepare(boolean)
  | com.arjuna.ats.arjuna.coordinator.BasicAction.End(boolean)
  | com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(boolean)
  | com.arjuna.ats.arjuna.AtomicAction.commit(boolean)
  | com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate()
  | com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit()
  | com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit()
  | org.jboss.util.threadpool.BasicTaskWrapper.run()
  | EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run()
  | java.lang.Thread.run()
  | 

Another invocation of one of the custom MDBs by the RA.
anonymous wrote : 
  | org.jboss.resource.connectionmanager.xa.JcaXAResourceWrapper.end(Xid, int)
  | com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.delistResource(XAResource, int)
  | org.jboss.resource.adapter.jms.JmsSession.close()
  | com.acme.gateway.util.jms.JmsOperation.execute()
  | com.acme.gateway.EnqueuerImpl.enqueue(EnqueuerImpl$QueueName, RoutableRequest, RoutableRequest, int, EnqueuerImpl$Header[])
  | com.acme.gateway.EnqueuerImpl.enqueueInboundLive(RoutableRequest, String)
  | $Proxy164.enqueueInboundLive(RoutableRequest, String)
  | com.acme.gateway.CarrierGateway.onRoutable(RoutableRequest)
  | com.acme.gateway.smppra.adapter.PDUListenerCoalescingAdapter.onDeliver(DeliverSM)
  | java.lang.reflect.Method.invoke(Object, Object[])
  | com.acme.gateway.smppra.spi.CarrierManagedConnection.dispatch(PDU, MessageEndpoint)
  | com.acme.gateway.smppra.spi.BaseManagedConnection.messageReceived(Object)
  | org.apache.mina.filter.executor.ExecutorFilter$ProcessEventsRunnable.run()
  | com.acme.gateway.smppra.spi.SmppResourceAdapter$3$1.run()
  | org.jboss.util.threadpool.BasicTaskWrapper.run()
  | EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run()
  | java.lang.Thread.run()
  | 

This one is an invocation of a JMS MDB by the container.
anonymous wrote : 
  | org.jboss.resource.connectionmanager.xa.JcaXAResourceWrapper.end(Xid, int)
  | com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.delistResource(XAResource, int)
  | org.jboss.resource.adapter.jms.JmsSession.close()
  | com.acme.gateway.util.jms.JmsOperation.execute()
  | com.acme.gateway.EnqueuerImpl.enqueue(EnqueuerImpl$QueueName, RoutableRequest, RoutableRequest, int, EnqueuerImpl$Header[])
  | com.acme.gateway.EnqueuerImpl.enqueueInboundSent(Message, RoutableRequest, RoutableRequest, Provider, Bind, Aggregation)
  | $Proxy164.enqueueInboundSent(Message, RoutableRequest, RoutableRequest, Provider, Bind, Aggregation)
  | com.acme.gateway.ClientSender.complete()
  | com.acme.gateway.ClientSender.onRoutable(RoutableRequest)
  | com.acme.gateway.smppra.adapter.PDUListenerCoalescingAdapter.onDeliver(DeliverSM)
  | com.logica.smpp.pdu.DeliverSM.accept(PDUListener)
  | com.logica.smpp.pdu.DeliverSM.accept(PDUListener)
  | com.acme.gateway.Sender.onMessage(Message)
  | org.jboss.util.threadpool.BasicTaskWrapper.run()
  | EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run()
  | java.lang.Thread.run()
  | 

Here is the listing of the closure I wrote to help me manage JMS resources, which appears in the traces:

  | package com.acme.gateway.util.jms;
  | 
  | import javax.ejb.EJBException;
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.JMSException;
  | import javax.jms.Session;
  | 
  | public abstract class JmsOperation
  | {
  | 	private static final org.apache.log4j.Logger log = 
  | 		org.apache.log4j.Logger.getLogger(JmsOperation.class);
  | 	
  | 	private final ConnectionFactory connectionFactory;
  | 	private Connection connection;
  | 	private Session session;
  | 	
  | 	public JmsOperation(ConnectionFactory connectionFactory)
  | 	{
  | 		this.connectionFactory = connectionFactory;
  | 	}
  | 	
  | 	protected ConnectionFactory getConnectionFactory()
  | 	{
  | 		return connectionFactory;
  | 	}
  | 	
  | 	protected Connection getConnection()
  | 	{
  | 		return connection;
  | 	}
  | 	
  | 	protected Session getSession()
  | 	{
  | 		return session;
  | 	}
  | 	
  | 	protected abstract void executeInternal() throws JMSException;
  | 	
  | 	public void execute()
  | 	{
  | 		try
  | 		{
  | 			connection = connectionFactory.createConnection();
  | 			
  | 			if (log.isTraceEnabled())
  | 			{
  | 				log.trace("execute: Got connection " + connection);
  | 			}
  | 			
  | 			session = connection.createSession(true, Session.SESSION_TRANSACTED);
  | 			
  | 			if (log.isTraceEnabled())
  | 			{
  | 				log.trace("execute: Got session " + session);
  | 			}
  | 
  | 			executeInternal();
  | 		}
  | 		catch (JMSException e)
  | 		{
  | 			throw new EJBException(e);
  | 		}
  | 		finally
  | 		{
  | 			try
  | 			{
  | 				if (session != null)
  | 				{
  | 					session.close();
  | 				}
  | 			}
  | 			catch (JMSException e)
  | 			{
  | 				log.warn("execute: Failed to close the session", e);
  | 			}
  | 			finally
  | 			{
  | 				session = null;
  | 				
  | 				try
  | 				{
  | 					if (connection != null)
  | 					{
  | 						connection.close();
  | 					}
  | 				}
  | 				catch (JMSException e)
  | 				{
  | 					log.warn("execute: Failed to close the connection", e);
  | 				}
  | 				finally
  | 				{
  | 					connection = null;
  | 				}
  | 			}
  | 		}
  | 	}
  | }
  | 

You would subclass it and implement your JMS code inside executeInternal(), making use of the Connection and Session that would be available during the execution of that function.

Sincere thanks for your time :)

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4118280#4118280

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4118280



More information about the jboss-user mailing list