[infinispan-issues] [JBoss JIRA] Resolved: (ISPN-887) XAResource implementation improvements (TransactionalXAResource)
Manik Surtani (JIRA)
jira-events at lists.jboss.org
Tue May 17 10:22:01 EDT 2011
[ https://issues.jboss.org/browse/ISPN-887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manik Surtani resolved ISPN-887.
--------------------------------
Resolution: Done
> XAResource implementation improvements (TransactionalXAResource)
> ----------------------------------------------------------------
>
> Key: ISPN-887
> URL: https://issues.jboss.org/browse/ISPN-887
> Project: Infinispan
> Issue Type: Feature Request
> Components: Transactions
> Affects Versions: 4.2.0.Final
> Reporter: Mircea Markus
> Assignee: Mircea Markus
> Fix For: 5.0.0.FINAL
>
>
> Reviewing the code with Jonathan Halliday has brought the following aspects (see TODOs below):
> package org.infinispan.transaction.xa;
> import org.infinispan.commands.CommandsFactory;
> import org.infinispan.commands.tx.CommitCommand;
> import org.infinispan.commands.tx.PrepareCommand;
> import org.infinispan.commands.tx.RollbackCommand;
> import org.infinispan.config.Configuration;
> import org.infinispan.context.InvocationContextContainer;
> import org.infinispan.context.impl.LocalTxInvocationContext;
> import org.infinispan.interceptors.InterceptorChain;
> import org.infinispan.util.logging.Log;
> import org.infinispan.util.logging.LogFactory;
> import javax.transaction.xa.XAException;
> import javax.transaction.xa.XAResource;
> import javax.transaction.xa.Xid;
> /**
> * This acts both as an local {@link org.infinispan.transaction.xa.CacheTransaction} and implementor of an {@link
> * javax.transaction.xa.XAResource} that will be called by tx manager on various tx stages.
> *
> * @author Mircea.Markus at jboss.com
> * @since 4.0
> */
> public class TransactionXaAdapter implements XAResource {
> private static final Log log = LogFactory.getLog(TransactionXaAdapter.class);
> private static boolean trace = log.isTraceEnabled();
> //todo - comment why timeout is not used
> // - it is useful only if TM and client are in separate processes and TM might fail. this is because a client might tm.begin and then the TM (running separate process) crashes
> // - in this scenario the TM won't ever call XAResource.rollback, so these resources would be held there forever
> // - not affecting us as in all scenarios TM&XAResource are collocated
> private int txTimeout;
> private final InvocationContextContainer icc;
> private final InterceptorChain invoker;
> private final CommandsFactory commandsFactory;
> private final Configuration configuration;
> private final TransactionTable txTable;
> /**
> * XAResource is associated with a transaction between enlistment (XAResource.start()) XAResource.end(). It's only the
> * boundary methods (prepare, commit, rollback) that need to be "stateless".
> * Reefer to section 3.4.4 from JTA spec v.1.1
> */
> private final LocalTransaction localTransaction;
> public TransactionXaAdapter(LocalTransaction localTransaction, TransactionTable txTable, CommandsFactory commandsFactory,
> Configuration configuration, InterceptorChain invoker, InvocationContextContainer icc) {
> this.localTransaction = localTransaction;
> this.txTable = txTable;
> this.commandsFactory = commandsFactory;
> this.configuration = configuration;
> this.invoker = invoker;
> this.icc = icc;
> }
> /**
> * This can be call for any transaction object. See Section 3.4.6 (Resource Sharing) from JTA spec v1.1.
> */
> public int prepare(Xid xid) throws XAException {
> //todo if I throw an exception here then I should also cleanup resources as .rollback might never be called!!
> LocalTransaction localTransaction = getLocalTransactionAndValidate(xid);
> //todo - same as last comment
> validateNotMarkedForRollback(localTransaction);
> if (configuration.isOnePhaseCommit()) {
> if (trace) log.trace("Received prepare for tx: {0}. Skipping call as 1PC will be used.", xid);
> return XA_OK;
> }
> PrepareCommand prepareCommand = commandsFactory.buildPrepareCommand(localTransaction.getGlobalTransaction(), localTransaction.getModifications(), configuration.isOnePhaseCommit());
> if (trace) log.trace("Sending prepare command through the chain: " + prepareCommand);
> LocalTxInvocationContext ctx = icc.createTxInvocationContext();
> ctx.setLocalTransaction(localTransaction);
> try {
> invoker.invoke(ctx, prepareCommand);
> if (localTransaction.isReadOnly()) {
> if (trace) log.trace("Readonly transaction: " + localTransaction.getGlobalTransaction());
> // force a cleanup to release any objects held. Some TMs don't call commit if it is a READ ONLY tx. See ISPN-845
> commit(xid, false);
> return XA_RDONLY;
> } else {
> return XA_OK;
> }
> } catch (Throwable e) {
> // todo if I throw this exception make sure that all locks are 100% cleaned up, as TM won't do any rollback call on it.
> // todo - handle this! -> if only a node fails to ack tx prepare, and that node is still part of the cluster, it needs to be sync with tx state.
> // one way of doing this is by pushing the tx state to that node until one of two happens: a) node ack or b) node is shunned from the cluster
> log.error("Error while processing PrepareCommand", e);
> throw new XAException(XAException.XAER_RMERR);
> }
> }
> /**
> * Same comment as for {@link #prepare(javax.transaction.xa.Xid)} applies for commit.
> */
> public void commit(Xid xid, boolean isOnePhase) throws XAException {
> LocalTransaction localTransaction = getLocalTransactionAndValidate(xid);
> if (trace) log.trace("committing transaction {0}", localTransaction.getGlobalTransaction());
> try {
> LocalTxInvocationContext ctx = icc.createTxInvocationContext();
> ctx.setLocalTransaction(localTransaction);
> // todo this needs to be split in two:
> // - configuration.isOnePhaseCommit() this is not "as important", as the user ack that it doesn't "really" need consistency
> // - on the other case ("isOnePhase"==true) make sure that this method either commits successfully or it fails and cleans up logs eventually
> if (configuration.isOnePhaseCommit() || isOnePhase) {
> validateNotMarkedForRollback(localTransaction);
> if (trace) log.trace("Doing an 1PC prepare call on the interceptor chain");
> PrepareCommand command = commandsFactory.buildPrepareCommand(localTransaction.getGlobalTransaction(), localTransaction.getModifications(), true);
> try {
> invoker.invoke(ctx, command);
> } catch (Throwable e) {
> log.error("Error while processing 1PC PrepareCommand", e);
> throw new XAException(XAException.XAER_RMERR);
> }
> } else {
> CommitCommand commitCommand = commandsFactory.buildCommitCommand(localTransaction.getGlobalTransaction());
> try {
> invoker.invoke(ctx, commitCommand);
> } catch (Throwable e) {
> log.error("Error while processing 1PC PrepareCommand", e);
> throw new XAException(XAException.XAER_RMERR);
> }
> }
> } finally {
> cleanup(localTransaction);
> }
> }
> /**
> * Same comment as for {@link #prepare(javax.transaction.xa.Xid)} applies for commit.
> */
> public void rollback(Xid xid) throws XAException {
> rollbackImpl(xid, commandsFactory, icc, invoker, txTable);
> }
> public static void rollbackImpl(Xid xid, CommandsFactory commandsFactory, InvocationContextContainer icc, InterceptorChain invoker, TransactionTable txTable) throws XAException {
> LocalTransaction localTransaction = txTable.getLocalTransaction(xid);
> if (localTransaction == null) {
> if (trace) log.trace("no tx found for {0}", xid);
> throw new XAException(XAException.XAER_NOTA);
> }
> if (trace) log.trace("rollback transaction {0} ", localTransaction.getGlobalTransaction());
> RollbackCommand rollbackCommand = commandsFactory.buildRollbackCommand(localTransaction.getGlobalTransaction());
> LocalTxInvocationContext ctx = icc.createTxInvocationContext();
> ctx.setLocalTransaction(localTransaction);
> try {
> invoker.invoke(ctx, rollbackCommand);
> } catch (Throwable e) {
> log.error("Exception while rollback", e);
> throw new XAException(XAException.XA_HEURHAZ);
> } finally {
> cleanupImpl(localTransaction, txTable, icc);
> }
> }
> private LocalTransaction getLocalTransactionAndValidate(Xid xid) throws XAException {
> LocalTransaction localTransaction1 = txTable.getLocalTransaction(xid);
> if (localTransaction1 == null) {
> log.error("This should not happen when XAResource and TM are in the same process! No tx found for {0}", xid);
> throw new XAException(XAException.XAER_NOTA);
> }
> return localTransaction1;
> }
> public void start(Xid xid, int i) throws XAException {
> localTransaction.setXid(xid);
> txTable.addLocalTransactionMapping(localTransaction);
> if (trace) log.trace("start called on tx " + this.localTransaction.getGlobalTransaction());
> }
> public void end(Xid xid, int i) throws XAException {
> if (trace) log.trace("end called on tx " + this.localTransaction.getGlobalTransaction());
> }
> public void forget(Xid xid) throws XAException {
> if (trace) log.trace("forget called");
> }
> public int getTransactionTimeout() throws XAException {
> if (trace) log.trace("start called");
> return txTimeout;
> }
> public boolean isSameRM(XAResource xaResource) throws XAException {
> if (!(xaResource instanceof TransactionXaAdapter)) {
> return false;
> }
> TransactionXaAdapter other = (TransactionXaAdapter) xaResource;
> return other.equals(this);
> }
> public Xid[] recover(int i) throws XAException {
> if (trace) log.trace("recover called: " + i);
> return null;
> }
> public boolean setTransactionTimeout(int i) throws XAException {
> this.txTimeout = i;
> return true;
> }
> @Override
> public boolean equals(Object o) {
> if (this == o) return true;
> if (!(o instanceof TransactionXaAdapter)) return false;
> TransactionXaAdapter that = (TransactionXaAdapter) o;
> return this.localTransaction.equals(that.localTransaction);
> }
> @Override
> public int hashCode() {
> return localTransaction.getGlobalTransaction().hashCode();
> }
> @Override
> public String toString() {
> return "TransactionXaAdapter{" +
> "localTransaction=" + localTransaction +
> '}';
> }
> private void validateNotMarkedForRollback(LocalTransaction localTransaction) throws XAException {
> if (localTransaction.isMarkedForRollback()) {
> if (trace) log.trace("Transaction already marked for rollback: {0}", localTransaction);
> throw new XAException(XAException.XA_RBROLLBACK);
> }
> }
> private void cleanup(LocalTransaction localTransaction) {
> TransactionXaAdapter.cleanupImpl(localTransaction, txTable, icc);
> }
> private static void cleanupImpl(LocalTransaction localTransaction, TransactionTable txTable, InvocationContextContainer icc) {
> txTable.removeLocalTransaction(localTransaction);
> icc.suspend();
> }
> }
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira
More information about the infinispan-issues
mailing list