[jboss-user] [JBoss Messaging] - XA transaction

relgames do-not-reply at jboss.com
Fri Jul 6 13:28:19 EDT 2007


Hi!

We have confronted with strange issue.

We have :
1) MDB1 that send JMS messages
2) MDB2 that receives message

Scenario is:
1. MDB1 persists entity and sends JMS message with ID of entity persisted
2. MDB2 received message and calls EntityManager.find() with id received.

Some times step 2 fails:  entity manager doesn't contains entity!
It seems that MDB receives message earlier than entity manager fully persists entity.

We use JmsXA as Connection Factory.

Here is the code:


  |     // MDB 1
  |     public void onMessage(Message message) {
  |         try {
  |             
  |             ... some code...
  | 
  |             Domain d = new Domain("some name");
  |             manager.persist(d);
  |             
  |             list.add(d);
  | 
  |             .... some code
  |             
  |             MessageHelper.sendMessages(list, new String[]{
  |                 Constants.JmsQueueNames.DOMAIN_OWNER_APPROVER,
  |                 Constants.JmsQueueNames.TRADEMARK_APPROVER,
  |                 Constants.JmsQueueNames.ADULT_APPROVER});
  |         } catch (JMSException e) {
  |             log.error(e);
  |             throw new EJBException(e);
  |         }
  |         log.debug("finished");
  |     }
  | 
  | /// MDB 2
  |     public void onMessage(Message message) {
  |           .. some code ...
  | 
  |             Domain domain = MessageHelper.getObject(message, manager, Domain.class);
  | 
  |           .. some code ...
  |            
  | }
  | 
  | 
  | public class MessageHelper {
  | 
  |     private static Log log = LogFactory.getLog(MessageHelper.class);
  | 
  |     private static final String ID_KEY = "ENTITY_ID";
  | 
  |     public static <T> T getObject(Message message, EntityManager em, Class<T> objClass) throws JMSException {
  |         Long pk = message.getLongProperty(ID_KEY);
  |         if (pk==null) {
  |             StringBuffer str = new StringBuffer();
  |             str.append("PK is null!\n");
  |             Enumeration props = message.getPropertyNames();
  |             while (props.hasMoreElements()) {
  |                 String prop = (String)props.nextElement();
  |                 str.append(prop).append("=").append(message.getObjectProperty(prop).toString()).append('\n');
  |             }
  |             log.warn(str.toString());
  |         }
  |         T obj = em.find(objClass, pk);
  |         if (obj==null) {
  |             log.warn("Object is null!, pk = " + pk);
  |         }
  |         return obj;
  |     }
  | 
  |     public static void sendMessage(Long objectId, String queue) {
  |         List<Long> list = new ArrayList<Long>();
  |         list.add(objectId);
  |         sendMessages(list, new String[]{queue});
  |     }
  | 
  |     public static void sendMessages(List<Long> objectIds, String[] queues) {
  |         Connection connection = null;
  |         Session session = null;
  |         try {
  |             connection = getConnectionFactory().createConnection();
  |             connection.start();
  | 
  |             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | 
  |             List<Message> messages = new ArrayList<Message>();
  | 
  |             for (Long id : objectIds) {
  |                 Message message = session.createMessage();
  |                 message.setLongProperty(ID_KEY, id);
  |                 messages.add(message);
  |             }
  | 
  |             sendJMSMessages(queues, session, messages);
  |         } catch (JMSException ex) {
  |             log.error(ex);
  |             throw new EJBException("error sending message", ex);
  |         } catch (NamingException e) {
  |             log.error(e);
  |             throw new EJBException("error sending message", e);
  |         } finally {
  |             closeSession(session);
  |             closeConnection(connection);
  |         }
  |     }
  | 
  |     private static void sendListObjectMessage(List objects, String[] queues) {
  |         Connection connection = null;
  |         Session session = null;
  |         try {
  |             connection = getConnectionFactory().createConnection();
  |             connection.start();
  | 
  |             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | 
  |             List<Message> messages = new ArrayList<Message>();
  | 
  |             for (Object object : objects) {
  |                 ObjectMessage message = session.createObjectMessage();
  |                 message.setObject((Serializable) object);
  |                 messages.add(message);
  |             }
  | 
  |             sendJMSMessages(queues, session, messages);
  |         } catch (JMSException ex) {
  |             log.error(ex);
  |             throw new EJBException("error sending message", ex);
  |         } catch (NamingException e) {
  |             log.error(e);
  |             throw new EJBException("error sending message", e);
  |         } finally {
  |             closeSession(session);
  |             closeConnection(connection);
  |         }
  |     }
  | 
  |     private static void sendJMSMessages(String[] queues, Session session, List<Message> messages) throws JMSException, NamingException {
  |         MessageProducer producer = session.createProducer(null);
  |         try {
  |             for (String q : queues) {
  |                 Destination dest = getQueue(q);
  |                 for (Message message : messages) {
  |                     producer.send(dest, message);
  |                 }
  |             }
  |         } finally {
  |             if (producer!=null) {
  |                 producer.close();
  |             }
  |         }
  |     }
  | 
  |     private static void sendObjectMessage(Serializable object, String[] queues) {
  |         ArrayList<Serializable> list = new ArrayList<Serializable>();
  |         list.add(object);
  |         sendListObjectMessage(list, queues);
  |     }
  | 
  |     public static void sendObjectMessage(Serializable object, String queue) {
  |         sendObjectMessage(object, new String[]{queue});
  |     }
  | 
  |     private static Queue getQueue(String name) throws NamingException {
  |         return (Queue) getContext().lookup(name);
  |     }
  | 
  |     private static Context _ctx;
  | 
  |     /**
  |      * http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossHAJNDIUseCluster
  |      */
  |     private static Context getContext() throws NamingException {
  |         if (null == _ctx) {
  |             /*
  |             String partitionName = System.getProperty("jboss.partition.name");
  |             if (null != partitionName && partitionName.length() > 0) {
  |                 Properties p = new Properties();
  |                 p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  |                 p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces");
  |                 p.put("jnp.partitionName", partitionName);
  |                 _ctx = new InitialContext(p);
  |             } else {
  |                 log.warn("can't find cluster partition name");
  |                 _ctx = new InitialContext();
  |             }
  |             */
  |             _ctx = new InitialContext();
  |         }
  |         return _ctx;
  |     }
  | 
  | 
  | /**
  |      * Closes the JMS connection.
  |      */
  |     private static void closeConnection(Connection connection) {
  |         try {
  |             if (connection != null)
  |                 connection.close();
  |         } catch (JMSException e) {
  |             log.warn("Could not close JMS connection", e);
  |         }
  |     }
  | 
  |     /**
  |      * Closes the JMS session.
  |      */
  |     private static void closeSession(Session session) {
  |         try {
  |             if (session != null)
  |                 session.close();
  |         } catch (JMSException e) {
  |             log.warn("Could not close JMS session", e);
  |         }
  |     }
  | 
  |     private static ConnectionFactory getConnectionFactory() throws NamingException {
  |         return (ConnectionFactory) getContext().lookup(Constants.JMS_CONNECTION_FACTORY_NAME);
  |     }
  | 
  |     public static void sendMessage(Long id, QueueConnectionFactory factory, String queue) {
  |         sendMessage(id, queue);
  |     }
  | 
  |     public static void sendMessages(List<Long> list, QueueConnectionFactory factory, String[] strings) {
  |         sendMessages(list, strings);
  |     }
  | 
  |     public static void sendObjectMessage(Serializable obj, QueueConnectionFactory factory, String queue) {
  |         sendObjectMessage(obj, queue);
  |     }
  | 
  | 
  | 

debug log is very big, but i've putted it here: http://tvpayrev.belhard.com/feeds/t.txt

you can see that MDB1 finished (first line), and last line informs us that MDB2 can't load entity.




View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4061407#4061407

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4061407



More information about the jboss-user mailing list