[hornetq-commits] JBoss hornetq SVN: r7966 - trunk/src/main/org/hornetq/ra/inflow.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Sep 17 05:54:17 EDT 2009
Author: ataylor
Date: 2009-09-17 05:54:17 -0400 (Thu, 17 Sep 2009)
New Revision: 7966
Modified:
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-134 - reverted to option B which is now followed exactly but left in local tx optimisation which is off by default
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2009-09-17 08:54:21 UTC (rev 7965)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2009-09-17 09:54:17 UTC (rev 7966)
@@ -13,12 +13,15 @@
package org.hornetq.ra.inflow;
import java.util.UUID;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.ResourceException;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -66,10 +69,7 @@
private final HornetQActivation activation;
- /**
- * The transaction demarcation strategy factory
- */
- private final DemarcationStrategyFactory strategyFactory = new DemarcationStrategyFactory();
+ private boolean useLocalTx;
public HornetQMessageHandler(final HornetQActivation activation, final ClientSession session)
{
@@ -103,7 +103,7 @@
SimpleString queueName = new SimpleString(HornetQTopic.createQueueNameForDurableSubscription(activation.getActivationSpec()
.getClientID(),
- subscriptionName));
+ subscriptionName));
SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
@@ -160,6 +160,7 @@
// Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+ useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
{
endpoint = endpointFactory.createEndpoint(session);
@@ -213,321 +214,49 @@
log.trace("onMessage(" + message + ")");
}
- TransactionDemarcationStrategy txnStrategy = strategyFactory.getStrategy();
- try
- {
- txnStrategy.start();
- }
- catch (Throwable throwable)
- {
- log.warn("Unable to create transaction: " + throwable.getMessage());
- txnStrategy = new NoTXTransactionDemarcationStrategy();
- }
-
HornetQMessage msg = HornetQMessage.createMessage(message, session);
-
+ boolean beforeDelivery = false;
try
{
+ endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
+ beforeDelivery = true;
msg.doBeforeReceive();
message.acknowledge();
- }
- catch (Exception e)
- {
- log.error("Failed to prepare message for receipt", e);
-
- return;
- }
-
- try
- {
((MessageListener) endpoint).onMessage(msg);
- }
- catch (Throwable t)
- {
- log.error("Unexpected error delivering message " + message, t);
- txnStrategy.error();
- }
- finally
- {
- txnStrategy.end();
- }
- }
-
- /**
- * Demarcation strategy factory
- */
- private class DemarcationStrategyFactory
- {
- /**
- * Get the transaction demarcation strategy
- *
- * @return The strategy
- */
- TransactionDemarcationStrategy getStrategy()
- {
- if (trace)
+ endpoint.afterDelivery();
+ if(useLocalTx)
{
- log.trace("getStrategy()");
+ session.commit();
}
-
- if (activation.isDeliveryTransacted())
- {
- if (!activation.getActivationSpec().isUseLocalTx())
- {
- try
- {
- return new XATransactionDemarcationStrategy();
- }
- catch (Throwable t)
- {
- log.error(this + " error creating transaction demarcation ", t);
- }
- }
- else
- {
- return new LocalDemarcationStrategy();
- }
-
- }
- else
- {
- if (!activation.getActivationSpec().isUseLocalTx())
- {
- return new NoTXTransactionDemarcationStrategy();
- }
- else
- {
- return new LocalDemarcationStrategy();
- }
- }
-
- return null;
}
- }
-
- /**
- * Transaction demarcation strategy
- */
- private interface TransactionDemarcationStrategy
- {
- /*
- * Start
- */
- void start() throws Throwable;
-
- /**
- * Error
- */
- void error();
-
- /**
- * End
- */
- void end();
- }
-
- /**
- * Local demarcation strategy
- */
- private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
- {
- private boolean rolledBack = false;
- /*
- * Start
- */
-
- public void start()
+ catch (Throwable e)
{
- }
-
- /**
- * Error
- */
- public void error()
- {
- if (trace)
+ log.error("Failed to deliver message", e);
+ //we need to call before/afterDelivery as a pair
+ if(beforeDelivery)
{
- log.trace("error()");
- }
-
- if (session != null)
- {
try
{
- session.rollback();
- rolledBack = true;
+ endpoint.afterDelivery();
}
- catch (HornetQException e)
+ catch (ResourceException e1)
{
- log.error("Failed to rollback session transaction", e);
+ log.warn("Unable to call after delivery");
}
}
- }
-
- /**
- * End
- */
- public void end()
- {
- if (trace)
+ if(useLocalTx)
{
- log.trace("end()");
- }
-
- if (!rolledBack)
- {
- if (session != null)
- {
- try
- {
- session.commit();
- }
- catch (HornetQException e)
- {
- log.error("Failed to commit session transaction", e);
- }
- }
- }
- }
- }
-
- /**
- * XA demarcation strategy
- */
- private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
- {
- private final TransactionManager tm = activation.getTransactionManager();
-
- private Transaction trans;
-
- public void start() throws Throwable
- {
- final int timeout = activation.getActivationSpec().getTransactionTimeout();
-
- if (timeout > 0)
- {
- if (trace)
- {
- log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
- }
-
- tm.setTransactionTimeout(timeout);
- }
-
- tm.begin();
-
- try
- {
- trans = tm.getTransaction();
-
- if (trace)
- {
- log.trace(this + " using tx=" + trans);
- }
-
- if (!trans.enlistResource(session))
- {
- throw new JMSException("could not enlist resource");
- }
- if (trace)
- {
- log.trace(this + " XAResource '" + session + " enlisted.");
- }
-
- }
- catch (Throwable t)
- {
try
{
- tm.rollback();
+ session.rollback();
}
- catch (Throwable ignored)
+ catch (HornetQException e1)
{
- log.trace(this + " ignored error rolling back after failed enlist", ignored);
+ log.warn("Unable to roll local transaction back");
}
- throw t;
}
}
- public void error()
- {
- // Mark for tollback TX via TM
- try
- {
- if (trace)
- {
- log.trace(this + " using TM to mark TX for rollback tx=" + trans);
- }
-
- trans.setRollbackOnly();
- }
- catch (Throwable t)
- {
- log.error(this + " failed to set rollback only", t);
- }
- }
-
- public void end()
- {
- try
- {
- // Use the TM to commit the Tx (assert the correct association)
- Transaction currentTx = tm.getTransaction();
- if (!trans.equals(currentTx))
- {
- throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
- }
-
- // Marked rollback
- if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
- {
- if (trace)
- {
- log.trace(this + " rolling back JMS transaction tx=" + trans);
- }
-
- // Actually roll it back
- tm.rollback();
-
- }
- else if (trans.getStatus() == Status.STATUS_ACTIVE)
- {
- // Commit tx
- // This will happen if
- // a) everything goes well
- // b) app. exception was thrown
- if (trace)
- {
- log.trace(this + " commiting the JMS transaction tx=" + trans);
- }
-
- tm.commit();
-
- }
- else
- {
- tm.suspend();
- }
- }
- catch (Throwable t)
- {
- log.error(this + " failed to commit/rollback", t);
- }
- }
}
- private class NoTXTransactionDemarcationStrategy implements TransactionDemarcationStrategy
- {
- public void start() throws Throwable
- {
- }
-
- public void error()
- {
- }
-
- public void end()
- {
- }
- }
}
More information about the hornetq-commits
mailing list