[jboss-cvs] JBoss Messaging SVN: r7637 - trunk/src/main/org/jboss/messaging/ra/inflow.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 29 06:17:35 EDT 2009
Author: ataylor
Date: 2009-07-29 06:17:35 -0400 (Wed, 29 Jul 2009)
New Revision: 7637
Modified:
trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
Log:
changed JCA to use transactions directly instead of endpoints
Modified: trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-07-29 08:53:42 UTC (rev 7636)
+++ trunk/src/main/org/jboss/messaging/ra/inflow/JBMMessageHandler.java 2009-07-29 10:17:35 UTC (rev 7637)
@@ -33,11 +33,12 @@
import org.jboss.messaging.utils.SimpleString;
import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
import javax.jms.MessageListener;
-import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
-import javax.transaction.SystemException;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.UUID;
@@ -109,7 +110,7 @@
}
SimpleString queueName = new SimpleString(JBossTopic.createQueueNameForDurableSubscription(activation.getActivationSpec()
- .getClientID(),
+ .getClientID(),
subscriptionName));
SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
@@ -247,7 +248,7 @@
try
{
- ((MessageListener)endpoint).onMessage(jbm);
+ ((MessageListener) endpoint).onMessage(jbm);
}
catch (Throwable t)
{
@@ -284,11 +285,11 @@
try
{
return new XATransactionDemarcationStrategy();
- }
- catch (Throwable t)
- {
- log.error(this + " error creating transaction demarcation ", t);
- }
+ }
+ catch (Throwable t)
+ {
+ log.error(this + " error creating transaction demarcation ", t);
+ }
}
else
{
@@ -405,6 +406,8 @@
{
private final TransactionManager tm = activation.getTransactionManager();
+ private Transaction trans;
+
public void start() throws Throwable
{
final int timeout = activation.getActivationSpec().getTransactionTimeout();
@@ -418,38 +421,105 @@
tm.setTransactionTimeout(timeout);
}
- endpoint.beforeDelivery(JBMActivation.ONMESSAGE);
- }
- public void error()
- {
+ tm.begin();
+
try
{
+ trans = tm.getTransaction();
+
+ if (trace)
+ {
+ log.trace(this + " using tx=" + trans);
+ }
+
+ if (!trans.enlistResource(session))
+ {
+ throw new JMSException("could not enlist resource");
+ }
+ if (trace)
+ {
+ log.trace(this + " XAResource '" + session + " enlisted.");
+ }
+
+ }
+ catch (Throwable t)
+ {
try
{
- tm.getTransaction().setRollbackOnly();
+ tm.rollback();
}
- catch (SystemException e)
+ catch (Throwable ignored)
{
- log.error("Unable to mark transaction as rollback only", e);
+ log.trace(this + " ignored error rolling back after failed enlist", ignored);
}
- endpoint.afterDelivery();
+ throw t;
}
- catch (ResourceException e)
+ }
+
+ public void error()
+ {
+ // Mark for tollback TX via TM
+ try
{
- log.error("Error calling after delivery on endpoint", e);
+ if (trace)
+ {
+ log.trace(this + " using TM to mark TX for rollback tx=" + trans);
+ }
+
+ trans.setRollbackOnly();
}
+ catch (Throwable t)
+ {
+ log.error(this + " failed to set rollback only", t);
+ }
}
public void end()
{
try
{
- endpoint.afterDelivery();
+ // Use the TM to commit the Tx (assert the correct association)
+ Transaction currentTx = tm.getTransaction();
+ if (!trans.equals(currentTx))
+ {
+ throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
+ }
+
+ // Marked rollback
+ if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+ {
+ if (trace)
+ {
+ log.trace(this + " rolling back JMS transaction tx=" + trans);
+ }
+
+ // Actually roll it back
+ tm.rollback();
+
+ }
+ else if (trans.getStatus() == Status.STATUS_ACTIVE)
+ {
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+ if (trace)
+ {
+ log.trace(this + " commiting the JMS transaction tx=" + trans);
+ }
+
+ tm.commit();
+
+ }
+ else
+ {
+ tm.suspend();
+ }
}
- catch (ResourceException e)
+ catch (Throwable t)
{
- log.error("Error calling after delivery on endpoint", e);
+ log.error(this + " failed to commit/rollback", t);
}
}
}
More information about the jboss-cvs-commits
mailing list