[jboss-user] [JBoss Messaging] - XA transaction
relgames
do-not-reply at jboss.com
Fri Jul 6 13:28:19 EDT 2007
Hi!
We have confronted with strange issue.
We have :
1) MDB1 that send JMS messages
2) MDB2 that receives message
Scenario is:
1. MDB1 persists entity and sends JMS message with ID of entity persisted
2. MDB2 received message and calls EntityManager.find() with id received.
Some times step 2 fails: entity manager doesn't contains entity!
It seems that MDB receives message earlier than entity manager fully persists entity.
We use JmsXA as Connection Factory.
Here is the code:
| // MDB 1
| public void onMessage(Message message) {
| try {
|
| ... some code...
|
| Domain d = new Domain("some name");
| manager.persist(d);
|
| list.add(d);
|
| .... some code
|
| MessageHelper.sendMessages(list, new String[]{
| Constants.JmsQueueNames.DOMAIN_OWNER_APPROVER,
| Constants.JmsQueueNames.TRADEMARK_APPROVER,
| Constants.JmsQueueNames.ADULT_APPROVER});
| } catch (JMSException e) {
| log.error(e);
| throw new EJBException(e);
| }
| log.debug("finished");
| }
|
| /// MDB 2
| public void onMessage(Message message) {
| .. some code ...
|
| Domain domain = MessageHelper.getObject(message, manager, Domain.class);
|
| .. some code ...
|
| }
|
|
| public class MessageHelper {
|
| private static Log log = LogFactory.getLog(MessageHelper.class);
|
| private static final String ID_KEY = "ENTITY_ID";
|
| public static <T> T getObject(Message message, EntityManager em, Class<T> objClass) throws JMSException {
| Long pk = message.getLongProperty(ID_KEY);
| if (pk==null) {
| StringBuffer str = new StringBuffer();
| str.append("PK is null!\n");
| Enumeration props = message.getPropertyNames();
| while (props.hasMoreElements()) {
| String prop = (String)props.nextElement();
| str.append(prop).append("=").append(message.getObjectProperty(prop).toString()).append('\n');
| }
| log.warn(str.toString());
| }
| T obj = em.find(objClass, pk);
| if (obj==null) {
| log.warn("Object is null!, pk = " + pk);
| }
| return obj;
| }
|
| public static void sendMessage(Long objectId, String queue) {
| List<Long> list = new ArrayList<Long>();
| list.add(objectId);
| sendMessages(list, new String[]{queue});
| }
|
| public static void sendMessages(List<Long> objectIds, String[] queues) {
| Connection connection = null;
| Session session = null;
| try {
| connection = getConnectionFactory().createConnection();
| connection.start();
|
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
| List<Message> messages = new ArrayList<Message>();
|
| for (Long id : objectIds) {
| Message message = session.createMessage();
| message.setLongProperty(ID_KEY, id);
| messages.add(message);
| }
|
| sendJMSMessages(queues, session, messages);
| } catch (JMSException ex) {
| log.error(ex);
| throw new EJBException("error sending message", ex);
| } catch (NamingException e) {
| log.error(e);
| throw new EJBException("error sending message", e);
| } finally {
| closeSession(session);
| closeConnection(connection);
| }
| }
|
| private static void sendListObjectMessage(List objects, String[] queues) {
| Connection connection = null;
| Session session = null;
| try {
| connection = getConnectionFactory().createConnection();
| connection.start();
|
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
| List<Message> messages = new ArrayList<Message>();
|
| for (Object object : objects) {
| ObjectMessage message = session.createObjectMessage();
| message.setObject((Serializable) object);
| messages.add(message);
| }
|
| sendJMSMessages(queues, session, messages);
| } catch (JMSException ex) {
| log.error(ex);
| throw new EJBException("error sending message", ex);
| } catch (NamingException e) {
| log.error(e);
| throw new EJBException("error sending message", e);
| } finally {
| closeSession(session);
| closeConnection(connection);
| }
| }
|
| private static void sendJMSMessages(String[] queues, Session session, List<Message> messages) throws JMSException, NamingException {
| MessageProducer producer = session.createProducer(null);
| try {
| for (String q : queues) {
| Destination dest = getQueue(q);
| for (Message message : messages) {
| producer.send(dest, message);
| }
| }
| } finally {
| if (producer!=null) {
| producer.close();
| }
| }
| }
|
| private static void sendObjectMessage(Serializable object, String[] queues) {
| ArrayList<Serializable> list = new ArrayList<Serializable>();
| list.add(object);
| sendListObjectMessage(list, queues);
| }
|
| public static void sendObjectMessage(Serializable object, String queue) {
| sendObjectMessage(object, new String[]{queue});
| }
|
| private static Queue getQueue(String name) throws NamingException {
| return (Queue) getContext().lookup(name);
| }
|
| private static Context _ctx;
|
| /**
| * http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossHAJNDIUseCluster
| */
| private static Context getContext() throws NamingException {
| if (null == _ctx) {
| /*
| String partitionName = System.getProperty("jboss.partition.name");
| if (null != partitionName && partitionName.length() > 0) {
| Properties p = new Properties();
| p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
| p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces");
| p.put("jnp.partitionName", partitionName);
| _ctx = new InitialContext(p);
| } else {
| log.warn("can't find cluster partition name");
| _ctx = new InitialContext();
| }
| */
| _ctx = new InitialContext();
| }
| return _ctx;
| }
|
|
| /**
| * Closes the JMS connection.
| */
| private static void closeConnection(Connection connection) {
| try {
| if (connection != null)
| connection.close();
| } catch (JMSException e) {
| log.warn("Could not close JMS connection", e);
| }
| }
|
| /**
| * Closes the JMS session.
| */
| private static void closeSession(Session session) {
| try {
| if (session != null)
| session.close();
| } catch (JMSException e) {
| log.warn("Could not close JMS session", e);
| }
| }
|
| private static ConnectionFactory getConnectionFactory() throws NamingException {
| return (ConnectionFactory) getContext().lookup(Constants.JMS_CONNECTION_FACTORY_NAME);
| }
|
| public static void sendMessage(Long id, QueueConnectionFactory factory, String queue) {
| sendMessage(id, queue);
| }
|
| public static void sendMessages(List<Long> list, QueueConnectionFactory factory, String[] strings) {
| sendMessages(list, strings);
| }
|
| public static void sendObjectMessage(Serializable obj, QueueConnectionFactory factory, String queue) {
| sendObjectMessage(obj, queue);
| }
|
|
|
debug log is very big, but i've putted it here: http://tvpayrev.belhard.com/feeds/t.txt
you can see that MDB1 finished (first line), and last line informs us that MDB2 can't load entity.
View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4061407#4061407
Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4061407
More information about the jboss-user
mailing list