Author: manik.surtani(a)jboss.com
Date: 2008-05-13 10:10:18 -0400 (Tue, 13 May 2008)
New Revision: 5834
Added:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
Modified:
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
Log:
JBCACHE-1339: Refactor and simplify TxInterceptor, and other enhancements
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-05-12 20:45:48
UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-05-13 14:10:18
UTC (rev 5834)
@@ -93,9 +93,14 @@
*/
public boolean isInvalidation()
{
- return this.equals(INVALIDATION_SYNC) || this.equals(INVALIDATION_SYNC);
+ return this == INVALIDATION_SYNC || this == INVALIDATION_SYNC;
}
+ public boolean isSynchronous()
+ {
+ return this == REPL_SYNC || this == INVALIDATION_SYNC;
+ }
+
}
public static CacheMode legacyModeToCacheMode(int legacyMode)
Modified: core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java 2008-05-12
20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -254,13 +254,6 @@
return new OptimisticPrepareCommand(gtx, modifications, data, address,
onePhaseCommit);
}
-
- public OptimisticPrepareCommand buildOptimisticPrepareCommand(GlobalTransaction gtx,
ReversibleCommand command)
- {
- List<ReversibleCommand> list = (List<ReversibleCommand>) (command !=
null ? Collections.singletonList(command) : Collections.emptyList());
- return buildOptimisticPrepareCommand(gtx, list, null, null, false);
- }
-
public AnnounceBuddyPoolNameCommand buildAnnounceBuddyPoolNameCommand(Address address,
String buddyPoolName)
{
AnnounceBuddyPoolNameCommand command = new AnnounceBuddyPoolNameCommand(address,
buddyPoolName);
Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -34,12 +34,10 @@
private CommandInterceptor createInterceptor(Class<? extends CommandInterceptor>
clazz) throws IllegalAccessException, InstantiationException
{
-// CommandInterceptor chainedInterceptor =
componentRegistry.getComponent(clazz.getName(), clazz);
CommandInterceptor chainedInterceptor = componentRegistry.getComponent(clazz);
if (chainedInterceptor == null)
{
chainedInterceptor = clazz.newInstance();
-// componentRegistry.registerComponent(clazz.getName(), chainedInterceptor,
clazz);
componentRegistry.registerComponent(chainedInterceptor, clazz);
}
else
@@ -66,7 +64,8 @@
interceptorChain.appendIntereceptor(createInterceptor(CacheMgmtInterceptor.class));
// load the tx interceptor
- interceptorChain.appendIntereceptor(createInterceptor(TxInterceptor.class));
+ interceptorChain.appendIntereceptor(createInterceptor(optimistic ?
OptimisticTxInterceptor.class : TxInterceptor.class));
+
if (configuration.isUseLazyDeserialization())
interceptorChain.appendIntereceptor(createInterceptor(MarshalledValueInterceptor.class));
interceptorChain.appendIntereceptor(createInterceptor(NotificationInterceptor.class));
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -9,7 +9,6 @@
import org.jboss.cache.cluster.ReplicationQueue;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.VisitableCommand;
-import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.annotations.Inject;
@@ -56,8 +55,7 @@
private void init()
{
usingBuddyReplication = configuration.getBuddyReplicationConfig() != null
&& configuration.getBuddyReplicationConfig().isEnabled();
- CacheMode mode = configuration.getCacheMode();
- defaultSynchronous = (mode == CacheMode.REPL_SYNC || mode ==
CacheMode.INVALIDATION_SYNC);
+ defaultSynchronous = configuration.getCacheMode().isSynchronous();
}
/**
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -43,7 +43,7 @@
}
}
- protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx,
InvocationContext ctx)
+ protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx,
TransactionEntry entry, InvocationContext ctx)
{
if (trace)
{
@@ -52,7 +52,14 @@
}
ctx.setTransaction(tx);
ctx.setGlobalTransaction(gtx);
- if (gtx != null) ctx.setTransactionEntry(txTable.get(gtx));
+ if (entry == null)
+ {
+ if (gtx != null) ctx.setTransactionEntry(txTable.get(gtx));
+ }
+ else
+ {
+ ctx.setTransactionEntry(entry);
+ }
}
/**
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -26,10 +26,8 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jgroups.Address;
@@ -66,7 +64,6 @@
public class DataGravitatorInterceptor extends BaseRpcInterceptor
{
private BuddyManager buddyManager;
- private boolean syncCommunications = false;
/**
* Map that contains commands that need cleaning up. This is keyed on global
transaction, and contains a list of
* cleanup commands corresponding to all gravitate calls made during the course of the
transaction in question.
@@ -85,12 +82,6 @@
this.cacheSPI = cacheSPI;
}
- @Start
- public void startInterceptor()
- {
- syncCommunications = configuration.getCacheMode() ==
Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() ==
Configuration.CacheMode.INVALIDATION_SYNC;
- }
-
@Override
public Object visitGetChildrenNamesCommand(InvocationContext ctx,
GetChildrenNamesCommand command) throws Throwable
{
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -44,65 +44,65 @@
@Override
public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command)
throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command)
throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand command)
throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command)
throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object visitOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
{
- return handleAll(ctx, command, null);
+ return handleAll(ctx, command, null, false);
}
@SuppressWarnings("deprecation")
- public Object handleAll(InvocationContext ctx, VisitableCommand command,
GlobalTransaction gtx) throws Throwable
+ private Object handleAll(InvocationContext ctx, VisitableCommand command,
GlobalTransaction gtx, boolean scrubContextOnCompletion) throws Throwable
{
Option optionOverride = ctx.getOptionOverrides();
boolean suppressExceptions = false;
@@ -118,11 +118,11 @@
Transaction tx = getTransaction();
GlobalTransaction realGtx = getGlobalTransaction(tx, gtx);
if (tx == null && realGtx != null && realGtx.isRemote()) tx =
txTable.getLocalTransaction(gtx);
- setTransactionalContext(tx, realGtx, ctx);
+ setTransactionalContext(tx, realGtx, null, ctx);
}
else
{
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
}
if (optionOverride != null)
@@ -134,7 +134,7 @@
if (ctx.getTransaction() != null)
{
suspendedTransaction = txManager.suspend();
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
if (trace) log.trace("Suspending transaction " +
suspendedTransaction);
resumeSuspended = true;
}
@@ -169,10 +169,19 @@
}
finally
{
+ /*
+ * we should scrub txs after every call to prevent race conditions
+ * basically any other call coming in on the same thread and hijacking any
running tx's
+ * was highlighted in JBCACHE-606
+ */
+ if (scrubContextOnCompletion) setTransactionalContext(null, null, null, ctx);
+
// clean up any invocation-scope options set up
if (trace) log.trace("Resetting invocation-scope options");
ctx.getOptionOverrides().reset();
+ // if this is a prepare, opt prepare or
+
if (resumeSuspended)
{
txManager.resume(suspendedTransaction);
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -51,6 +51,8 @@
{
timeout = ctx.getOptionOverrides().getLockAcquisitionTimeout();
}
+
+ boolean succeeded = false;
try
{
TransactionWorkspace<?, ?> workspace = getTransactionWorkspace(ctx);
@@ -75,25 +77,23 @@
}
}
+
+ // locks have acquired so lets pass on up
+ Object retval = invokeNextInterceptor(ctx, command);
+ succeeded = true;
+ return retval;
}
catch (Throwable e)
{
+ succeeded = false;
log.debug("Caught exception attempting to lock nodes ", e);
//we have failed - set to rollback and throw exception
- try
- {
- unlock(ctx, gtx);
- }
- catch (Throwable t)
- {
- // we have failed to unlock - now what?
- log.error("Failed to unlock nodes, after failing to lock nodes during a
prepare! Locks are possibly in a very inconsistent state now!", t);
- }
throw e;
}
-
- // locks have acquired so lets pass on up
- return invokeNextInterceptor(ctx, command);
+ finally
+ {
+ if (!succeeded || command.isOnePhaseCommit()) unlock(ctx, gtx);
+ }
}
@Override
@@ -119,15 +119,7 @@
}
finally
{
- try
- {
- unlock(ctx, getGlobalTransaction(ctx));
- }
- catch (Exception e)
- {
- // we have failed to unlock - now what?
- log.error("Failed to unlock nodes after a commit or rollback! Locks are
possibly in a very inconsistent state now!", e);
- }
+ unlock(ctx, getGlobalTransaction(ctx));
}
return retval;
}
@@ -139,8 +131,16 @@
*/
private void unlock(InvocationContext ctx, GlobalTransaction gtx)
{
- TransactionEntry entry = ctx.getTransactionEntry();
- entry.releaseAllLocksFIFO(gtx);
+ try
+ {
+ TransactionEntry entry = ctx.getTransactionEntry();
+ entry.releaseAllLocksFIFO(gtx);
+ }
+ catch (Exception e)
+ {
+ // we have failed to unlock - now what?
+ log.error("Failed to unlock nodes after a commit or rollback! Locks are
possibly in a very inconsistent state now!", e);
+ }
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -26,7 +26,6 @@
import org.jboss.cache.commands.write.RemoveDataCommand;
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.annotations.Inject;
@@ -187,8 +186,6 @@
protected void broadcastPrepare(OptimisticPrepareCommand command, GlobalTransaction
gtx, InvocationContext ctx) throws Throwable
{
- boolean remoteCallSync = configuration.getCacheMode() ==
Configuration.CacheMode.REPL_SYNC;
-
// this method will return immediately if we're the only member
if (rpcManager.getMembers() != null && rpcManager.getMembers().size() >
1)
{
@@ -210,7 +207,7 @@
{
log.debug("(" + rpcManager.getLocalAddress() + "):
broadcasting prepare for " + gtx + " (" + command.getModificationsCount() +
" modifications");
}
- replicateCall(ctx, toBroadcast, remoteCallSync, ctx.getOptionOverrides());
+ replicateCall(ctx, toBroadcast, defaultSynchronous, ctx.getOptionOverrides());
}
else
{
Added: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -0,0 +1,222 @@
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.commands.AbstractVisitor;
+import org.jboss.cache.commands.VersionedDataCommand;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveDataCommand;
+import org.jboss.cache.commands.write.RemoveKeyCommand;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.OptimisticTransactionEntry;
+import org.jboss.cache.transaction.TransactionEntry;
+
+import javax.transaction.Transaction;
+import java.util.List;
+
+/**
+ * A new interceptor to simplify functionality in the {@link
org.jboss.cache.interceptors.TxInterceptor}.
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.2.0
+ */
+public class OptimisticTxInterceptor extends TxInterceptor
+{
+ protected final ModificationsReplayVisitor replayVisitor = new
ModificationsReplayVisitor();
+
+ @Override
+ public Object visitOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable
+ {
+ // nothing really different from a pessimistic prepare command.
+ return visitPrepareCommand(ctx, command);
+ }
+
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
+ {
+ try
+ {
+ Transaction tx = ctx.getTransaction();
+ boolean implicitTransaction = tx == null;
+ if (implicitTransaction)
+ {
+ tx = createLocalTx();
+ // we need to attach this tx to the InvocationContext.
+ ctx.setTransaction(tx);
+ }
+
+ try
+ {
+ Object retval = attachGtxAndPassUpChain(ctx, command);
+ if (implicitTransaction)
+ {
+ copyInvocationScopeOptionsToTxScope(ctx);
+ copyForcedCacheModeToTxScope(ctx);
+ txManager.commit();
+ }
+ return retval;
+ }
+ catch (Throwable t)
+ {
+ if (implicitTransaction)
+ {
+ log.warn("Rolling back, exception encountered", t);
+ try
+ {
+ copyInvocationScopeOptionsToTxScope(ctx);
+ copyForcedCacheModeToTxScope(ctx);
+ txManager.rollback();
+ }
+ catch (Throwable th)
+ {
+ log.warn("Roll back failed encountered", th);
+ }
+ throw t;
+ }
+ }
+ }
+ catch (Throwable th)
+ {
+ ctx.throwIfNeeded(th);
+ }
+
+ return null;
+ }
+
+ private void copyForcedCacheModeToTxScope(InvocationContext ctx)
+ {
+ Option optionOverride = ctx.getOptionOverrides();
+ if (optionOverride != null
+ && (optionOverride.isForceAsynchronous() ||
optionOverride.isForceSynchronous()))
+ {
+ TransactionEntry entry = ctx.getTransactionEntry();
+ if (entry != null)
+ {
+ if (optionOverride.isForceAsynchronous())
+ entry.setForceAsyncReplication(true);
+ else
+ entry.setForceSyncReplication(true);
+ }
+ }
+ }
+
+ @Override
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List
modifications, boolean onePhaseCommit)
+ {
+ return commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null,
rpcManager.getLocalAddress(), onePhaseCommit);
+ }
+
+ /**
+ * Replays modifications by passing them up the interceptor chain.
+ *
+ * @throws Throwable
+ */
+ @Override
+ protected boolean replayModifications(InvocationContext ctx, Transaction ltx,
PrepareCommand command) throws Throwable
+ {
+ if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare "
+ ctx.getGlobalTransaction());
+
+ // invoke all modifications by passing them up the chain, setting data versions
first.
+ try
+ {
+ replayVisitor.visitCollection(ctx, command.getModifications());
+ }
+ catch (Throwable t)
+ {
+ log.error("Prepare failed!", t);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
+ {
+ TransactionEntry entry = ctx.getTransactionEntry();
+ if (entry != null)
+ {
+ entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
+ ((OptimisticTransactionEntry) entry).getTransactionWorkSpace().clearNodes();
+ }
+ }
+
+ @Override
+ protected TransactionEntry createNewTransactionEntry(Transaction tx) throws Exception
+ {
+ return new OptimisticTransactionEntry(tx);
+ }
+
+ private class ModificationsReplayVisitor extends AbstractVisitor
+ {
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
+ {
+ Object result = invokeNextInterceptor(ctx, command);
+ assertTxIsStillValid(ctx.getTransaction());
+ return result;
+ }
+
+ @Override
+ public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand
command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand
command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand
command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ private Object handleDataVersionCommand(InvocationContext ctx, VersionedDataCommand
command) throws Throwable
+ {
+ Option originalOption = ctx.getOptionOverrides();
+ if (command.isVersioned())
+ {
+ Option option = new Option();
+ option.setDataVersion(command.getDataVersion());
+ ctx.setOptionOverrides(option);
+ }
+ Object retval;
+ try
+ {
+ retval = invokeNextInterceptor(ctx, command);
+ assertTxIsStillValid(ctx.getTransaction());
+ }
+ catch (Throwable t)
+ {
+ log.error("method invocation failed", t);
+ throw t;
+ }
+ finally
+ {
+ ctx.setOptionOverrides(originalOption);
+ }
+ return retval;
+ }
+ }
+
+}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -136,23 +136,21 @@
}
}
log.debug("Successfully validated nodes");
- return invokeNextInterceptor(ctx, command);
+ Object retval = invokeNextInterceptor(ctx, command);
+ if (command.isOnePhaseCommit())
+ {
+ // do a comit-phase
+ commitTransaction(ctx);
+ }
+ return retval;
}
- @Override
- public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
+ private void commitTransaction(InvocationContext ctx)
{
GlobalTransaction gtx = getGlobalTransaction(ctx);
TransactionWorkspace workspace;
- try
- {
- workspace = getTransactionWorkspace(ctx);
- }
- catch (CacheException e)
- {
- log.warn("we can't rollback", e);
- return invokeNextInterceptor(ctx, command);
- }
+ workspace = getTransactionWorkspace(ctx);
+
if (log.isDebugEnabled()) log.debug("Commiting successfully validated changes
for GlobalTransaction " + gtx);
Collection<WorkspaceNode> workspaceNodes = workspace.getNodes().values();
for (WorkspaceNode workspaceNode : workspaceNodes)
@@ -253,6 +251,12 @@
}
}
}
+ }
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
+ {
+ commitTransaction(ctx);
return invokeNextInterceptor(ctx, command);
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-12
20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -13,7 +13,6 @@
import org.jboss.cache.commands.AbstractVisitor;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.ReversibleCommand;
-import org.jboss.cache.commands.VersionedDataCommand;
import org.jboss.cache.commands.VisitableCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
@@ -26,7 +25,6 @@
import org.jboss.cache.commands.write.RemoveDataCommand;
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.ComponentRegistry;
@@ -34,9 +32,9 @@
import org.jboss.cache.invocation.InvocationContextContainer;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import javax.transaction.InvalidTransactionException;
import javax.transaction.Status;
@@ -48,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -61,22 +60,17 @@
*/
public class TxInterceptor extends BaseTransactionalContextInterceptor implements
TxInterceptorMBean
{
- private final static Object NULL = new Object();
-
- private CommandsFactory commandsFactory;
- private RPCManager rpcManager;
+ protected CommandsFactory commandsFactory;
+ protected RPCManager rpcManager;
private Notifier notifier;
private InvocationContextContainer invocationContextContainer;
private ComponentRegistry componentRegistry;
- private final ModificationsReplayVisitor replayVisitorNoInject = new
ModificationsReplayVisitor(false);
- private final ModificationsReplayVisitor replayVisitorWithInject = new
ModificationsReplayVisitor(true);
-
/**
* List <Transaction>that we have registered for
*/
- private final Map transactions = new ConcurrentHashMap(16);
- private final Map rollbackTransactions = new ConcurrentHashMap(16);
+ private final Set<Transaction> transactions = new
ConcurrentHashSet<Transaction>();
+ private final Map<Transaction, GlobalTransaction> rollbackTransactions = new
ConcurrentHashMap<Transaction, GlobalTransaction>(16);
private long prepares = 0;
private long commits = 0;
private long rollbacks = 0;
@@ -94,17 +88,9 @@
}
@Override
- @SuppressWarnings("unchecked")
- public Object visitOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable
- {
- return visitPrepareCommand(ctx, command);
- }
-
- @Override
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
{
Object result = null;
- boolean scrubTxsOnExit = false;
// this is a prepare, commit, or rollback.
if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " +
ctx.getGlobalTransaction());
@@ -113,8 +99,7 @@
if (ctx.getGlobalTransaction().isRemote())
{
result = handleRemotePrepare(ctx, command);
- scrubTxsOnExit = true;
- increasePrepares();
+ if (getStatisticsEnabled()) prepares++;
}
else
{
@@ -126,18 +111,10 @@
{
ctx.throwIfNeeded(e);
}
- finally
- {
- scrubOnExist(ctx, scrubTxsOnExit);
- }
+
return result;
}
- private void increasePrepares()
- {
- if (getStatisticsEnabled()) prepares++;
- }
-
@Override
@SuppressWarnings("unchecked")
public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
@@ -167,7 +144,7 @@
}
if (log.isDebugEnabled()) log.debug(" executing commit() with local TX
" + ltx + " under global tx " + gtx);
txManager.commit();
- increaseCommits();
+ if (getStatisticsEnabled()) commits++;
}
finally
{
@@ -187,10 +164,7 @@
{
ctx.throwIfNeeded(throwable);
}
- finally
- {
- scrubOnExist(ctx, true);
- }
+
return null;
}
@@ -211,7 +185,6 @@
{
log.warn("No local transaction for this remotely originating rollback.
Possibly rolling back before a prepare call was broadcast?");
txTable.remove(gtx);
- scrubOnExist(ctx, true);
return null;
}
// disconnect if we have a current tx associated
@@ -229,7 +202,7 @@
}
if (log.isDebugEnabled()) log.debug("executing with local TX " +
ltx + " under global tx " + gtx);
txManager.rollback();
- increaseRollbacks();
+ if (getStatisticsEnabled()) rollbacks++;
}
finally
{
@@ -251,26 +224,48 @@
{
ctx.throwIfNeeded(throwable);
}
- finally
- {
- scrubOnExist(ctx, true);
- }
+
return null;
}
+ @Override
+ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable
+ {
+ return invokeNextInterceptor(ctx, command);
+ }
+
/**
- * we should scrub txs after every call to prevent race conditions
- * basically any other call coming in on the same thread and hijacking any running
tx's
- * was highlighted in JBCACHE-606
+ * Tests if we already have a tx running. If so, register a sync handler for this
method invocation.
+ * if not, create a local tx if we're using opt locking.
+ *
+ * @return
+ * @throws Throwable
*/
- private void scrubOnExist(InvocationContext ctx, boolean scrubTxsOnExit)
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
{
- if (scrubTxsOnExit)
+ try
{
- setTransactionalContext(null, null, ctx);
+ return attachGtxAndPassUpChain(ctx, command);
}
+ catch (Throwable throwable)
+ {
+ ctx.throwIfNeeded(throwable);
+ return null;
+ }
}
+ protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand
command) throws Throwable
+ {
+ Transaction tx = ctx.getTransaction();
+ if (tx != null) attachGlobalTransaction(ctx, tx, command);
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ // ------------------------------------------------------------------------
+ // JMX statistics
+ // ------------------------------------------------------------------------
+
public long getPrepares()
{
return prepares;
@@ -306,42 +301,56 @@
// --------------------------------------------------------------
+ /**
+ * Handles a remotely originating prepare call, by creating a local transaction for
the remote global transaction
+ * and replaying modifications in this new local transaction.
+ *
+ * @param ctx invocation context
+ * @param command prepare command
+ * @return result of the prepare, typically a null.
+ * @throws Throwable in the event of problems.
+ */
private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command)
throws Throwable
{
+ // the InvocationContextInterceptor would have set this for us
GlobalTransaction gtx = ctx.getGlobalTransaction();
- // Is there a local transaction associated with GTX ?
+
+ // Is there a local transaction associated with GTX? (not the current tx
associated with the thread, which may be
+ // in the invocation context
Transaction ltx = txTable.getLocalTransaction(gtx);
Transaction currentTx = txManager.getTransaction();
- Object retval = null;
+ Object retval = null;
+ boolean success = false;
try
{
if (ltx == null)
{
if (currentTx != null) txManager.suspend();
- ltx = createLocalTxForGlobalTx(gtx, ctx);// creates new LTX and associates it
with a GTX
+ // create a new local transaction
+ ltx = createLocalTx();
+ // associate this with a global tx
+ txTable.put(ltx, gtx);
+ if (trace) log.trace("Created new tx for gtx " + gtx);
+
if (log.isDebugEnabled())
- {
log.debug("Started new local tx as result of remote prepare: local
tx=" + ltx + " (status=" + ltx.getStatus() + "), gtx=" + gtx);
- }
}
else
{
//this should be valid
- if (!ctx.isValidTransaction())
+ if (!TransactionTable.isValid(ltx))
throw new CacheException("Transaction " + ltx + " not in
correct state to be prepared");
- //associate this thread with this ltx if this ltx is NOT the current tx.
+ //associate this thread with the local transaction associated with the global
transaction, IF the localTx is NOT the current tx.
if (currentTx == null || !ltx.equals(currentTx))
{
+ if (trace) log.trace("Suspending current tx " + currentTx);
txManager.suspend();
txManager.resume(ltx);
}
}
- if (trace)
- {
- log.trace("Resuming existing transaction " + ltx + ", global
TX=" + gtx);
- }
+ if (trace) log.trace("Resuming existing tx " + ltx + ", global
tx=" + gtx);
// at this point we have a non-null ltx
@@ -353,239 +362,42 @@
if (entry == null)
{
// create a new transaction entry
- entry = configuration.isNodeLockingOptimistic() ? new
OptimisticTransactionEntry(ltx) : new TransactionEntry(ltx);
- log.debug("creating new tx entry");
+ if (log.isDebugEnabled()) log.debug("creating new tx entry");
+ entry = createNewTransactionEntry(ltx);
txTable.put(gtx, entry);
- ctx.setTransactionEntry(entry);
- if (trace) log.trace("TxTable contents: " + txTable);
}
- setTransactionalContext(ltx, gtx, ctx);
+ setTransactionalContext(ltx, gtx, entry, ctx);
+
// register a sync handler for this tx.
- registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx);
+ registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, entry), ctx);
- if (configuration.isNodeLockingOptimistic())
+ // replay modifications
+ success = replayModifications(ctx, ltx, command);
+
+ // now pass the prepare command up the chain as well.
+ if (command.isOnePhaseCommit())
{
- retval = handleOptimisticPrepare(ctx, gtx, ltx, (OptimisticPrepareCommand)
command);
+ if (trace)
+ log.trace("Using one-phase prepare. Not propagating the prepare call
up the stack until called to do so by the sync handler.");
}
else
{
- retval = handlePessimisticPrepare(ctx, ltx, command);
+ // now pass up the prepare method itself.
+ invokeNextInterceptor(ctx, command);
}
+ // JBCACHE-361 Confirm that the transaction is ACTIVE
+ assertTxIsStillValid(ltx);
}
finally
{
- txManager.suspend();// suspends ltx - could be null
- // resume whatever else we had going.
- if (currentTx != null) txManager.resume(currentTx);
- if (log.isDebugEnabled()) log.debug("Finished remote prepare " +
gtx);
- }
+ // if we are running a one-phase commit, perform a commit or rollback now.
+ if (trace) log.trace("Are we running a 1-phase commit? " +
command.isOnePhaseCommit());
- return retval;
- }
- // handler methods.
- // --------------------------------------------------------------
-
-
- @Override
- public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable
- {
- return invokeNextInterceptor(ctx, command);
- }
-
- /**
- * Tests if we already have a tx running. If so, register a sync handler for this
method invocation.
- * if not, create a local tx if we're using opt locking.
- *
- * @return
- * @throws Throwable
- */
- @Override
- public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
- {
- Object result;
- try
- {
- Transaction tx = ctx.getTransaction();
- // if there is no current tx and we're using opt locking, we need to use an
implicit tx.
- boolean implicitTransaction = configuration.isNodeLockingOptimistic() &&
tx == null;
- if (implicitTransaction)
- {
- tx = createLocalTx();
- // we need to attach this tx to the InvocationContext.
- ctx.setTransaction(tx);
- }
- if (tx != null)
- attachGlobalTransaction(ctx, tx, command);
-
- GlobalTransaction gtx = ctx.getGlobalTransaction();
-
- try
- {
- result = invokeNextInterceptor(ctx, command);
- if (implicitTransaction)
- {
- copyInvocationScopeOptionsToTxScope(ctx);
- copyForcedCacheModeToTxScope(ctx);
- txManager.commit();
- }
- }
- catch (Throwable t)
- {
- if (implicitTransaction)
- {
- log.warn("Rolling back, exception encountered", t);
- try
- {
- setTransactionalContext(tx, gtx, ctx);
- txManager.rollback();
- }
- catch (Throwable th)
- {
- log.warn("Roll back failed encountered", th);
- }
- throw t;
- }
- else
- {
- throw t;
- }
- }
- return result;
- }
- catch (Throwable throwable)
- {
- ctx.throwIfNeeded(throwable);
- return null;
- }
- }
-
- private void copyForcedCacheModeToTxScope(InvocationContext ctx)
- {
- Option optionOverride = ctx.getOptionOverrides();
- if (optionOverride != null
- && (optionOverride.isForceAsynchronous() ||
optionOverride.isForceSynchronous()))
- {
- TransactionEntry entry = ctx.getTransactionEntry();
- if (entry != null)
- {
- if (optionOverride.isForceAsynchronous())
- entry.setForceAsyncReplication(true);
- else
- entry.setForceSyncReplication(true);
- }
- }
- }
-
- private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction
tx, VisitableCommand command) throws Throwable
- {
- if (trace)
- {
- log.trace(" local transaction exists - registering global tx if not present
for " + Thread.currentThread());
- }
- if (trace)
- {
- GlobalTransaction tempGtx = txTable.get(tx);
- log.trace("Associated gtx in txTable is " + tempGtx);
- }
-
- // register a sync handler for this tx - only if the globalTransaction is not
remotely initiated.
- GlobalTransaction gtx = registerTransaction(tx, ctx);
- if (gtx != null)
- {
- command = replaceGtx(command, gtx);
- }
- else
- {
- // get the current globalTransaction from the txTable.
- gtx = txTable.get(tx);
- }
-
- // make sure we attach this globalTransaction to the invocation context.
- ctx.setGlobalTransaction(gtx);
-
- return command;
- }
-
- /**
- * This is called by invoke() if we are in a remote gtx's prepare() phase.
- * Finds the appropriate tx, suspends any existing txs, registers a sync handler
- * and passes up the chain.
- * <p/>
- * Resumes any existing txs before returning.
- *
- * @throws Throwable
- */
- private Object handleOptimisticPrepare(InvocationContext ctx, GlobalTransaction gtx,
Transaction ltx, OptimisticPrepareCommand command) throws Throwable
- {
- Object retval;
- if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare "
+ gtx);
- replayVisitorWithInject.visitCollection(ctx, command.getModifications());
- retval = invokeNextInterceptor(ctx, command);
- // JBCACHE-361 Confirm that the transaction is ACTIVE
- if (!TransactionTable.isActive(ltx))
- {
- throw new ReplicationException("prepare() failed -- local transaction
status is not STATUS_ACTIVE;" +
- " is " + ltx.getStatus());
- }
- return retval;
- }
-
- private Object handlePessimisticPrepare(InvocationContext ctx, Transaction ltx,
PrepareCommand command) throws Exception
- {
- boolean success = true;
- Object retval = null;
- try
- {
- // now pass up the prepare method itself.
- try
- {
- replayVisitorNoInject.visitCollection(ctx, command.getModifications());
- if (command.isOnePhaseCommit())
- {
- if (trace)
- log.trace("Using one-phase prepare. Not propagating the prepare
call up the stack until called to do so by the sync handler.");
- }
- else
- {
- retval = invokeNextInterceptor(ctx, command);
- }
-
- // JBCACHE-361 Confirm that the transaction is ACTIVE
- if (!ctx.isValidTransaction())
- {
- throw new ReplicationException("prepare() failed -- " +
- "local transaction status is not valid;" +
- " is " + ltx.getStatus());
- }
- }
- catch (Throwable th)
- {
- log.error("prepare method invocation failed", th);
- retval = th;
- success = false;
- if (retval instanceof Exception)
- {
- throw (Exception) retval;
- }
- }
- }
- finally
- {
-
- if (trace)
- {
- log.trace("Are we running a 1-phase commit? " +
command.isOnePhaseCommit());
- }
- // 4. If commit == true (one-phase-commit): commit (or rollback) the TX; this
will cause
- // {before/after}Completion() to be called in all registered interceptors:
the TransactionInterceptor
- // will then commit/rollback against the cache
-
if (command.isOnePhaseCommit())
{
try
{
- // invokeOnePhaseCommitMethod(globalTransaction,
modifications.size() > 0, success);
if (success)
{
ltx.commit();
@@ -618,92 +430,76 @@
transactions.remove(ltx);// JBAS-298
}
}
+
+ txManager.suspend();// suspends ltx - could be null
+ // resume whatever else we had going.
+ if (currentTx != null) txManager.resume(currentTx);
+ if (log.isDebugEnabled()) log.debug("Finished remote prepare " +
gtx);
}
+
return retval;
}
- public class ModificationsReplayVisitor extends AbstractVisitor
+ protected TransactionEntry createNewTransactionEntry(Transaction tx) throws Exception
{
- private final boolean injectDataVersions;
+ return new TransactionEntry(tx);
+ }
- public ModificationsReplayVisitor(boolean injectDataVersions)
+ private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction
tx, VisitableCommand command) throws Throwable
+ {
+ if (trace)
{
- this.injectDataVersions = injectDataVersions;
+ log.trace(" local transaction exists - registering global tx if not present
for " + Thread.currentThread());
}
-
- @Override
- public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
+ if (trace)
{
- Object result = invokeNextInterceptor(ctx, command);
- assertTxIsActive(ctx);
- return result;
+ GlobalTransaction tempGtx = txTable.get(tx);
+ log.trace("Associated gtx in txTable is " + tempGtx);
}
- @Override
- public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable
+ // register a sync handler for this tx - only if the globalTransaction is not
remotely initiated.
+ GlobalTransaction gtx = registerTransaction(tx, ctx);
+ if (gtx != null)
{
- return handleDataVersionCommand(ctx, command);
+ command = replaceGtx(command, gtx);
}
-
- @Override
- public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ else
{
- return handleDataVersionCommand(ctx, command);
+ // get the current globalTransaction from the txTable.
+ gtx = txTable.get(tx);
}
- @Override
- public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand
command) throws Throwable
- {
- return handleDataVersionCommand(ctx, command);
- }
+ // make sure we attach this globalTransaction to the invocation context.
+ ctx.setGlobalTransaction(gtx);
- @Override
- public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand
command) throws Throwable
- {
- return handleDataVersionCommand(ctx, command);
- }
+ return command;
+ }
- @Override
- public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand
command) throws Throwable
+ /**
+ * Replays modifications
+ *
+ * @param ctx
+ * @param ltx
+ * @param command
+ * @return
+ * @throws Exception
+ */
+ protected boolean replayModifications(InvocationContext ctx, Transaction ltx,
PrepareCommand command) throws Throwable
+ {
+ try
{
- return handleDataVersionCommand(ctx, command);
- }
-
- private Object handleDataVersionCommand(InvocationContext ctx, VersionedDataCommand
command) throws Throwable
- {
- if (!injectDataVersions) return handleDefault(ctx, command);
- Option originalOption = ctx.getOptionOverrides();
- if (command.isVersioned())
+ // replay modifications
+ for (ReversibleCommand modification : command.getModifications())
{
- Option option = new Option();
- option.setDataVersion(command.getDataVersion());
- ctx.setOptionOverrides(option);
+ invokeNextInterceptor(ctx, modification);
+ assertTxIsStillValid(ltx);
}
- Object retval;
- try
- {
- retval = invokeNextInterceptor(ctx, command);
- assertTxIsActive(ctx);
- }
- catch (Throwable t)
- {
- log.error("method invocation failed", t);
- throw t;
- }
- finally
- {
- ctx.setOptionOverrides(originalOption);
- }
- return retval;
+ return true;
}
-
- private void assertTxIsActive(InvocationContext ctx)
- throws SystemException
+ catch (Throwable th)
{
- if (!TransactionTable.isActive(ctx.getTransaction()))
- {
- throw new ReplicationException("prepare() failed -- " + "local
transaction status is not STATUS_ACTIVE; is " + ctx.getTransaction().getStatus());
- }
+ log.error("prepare failed!", th);
+ return false;
}
}
@@ -719,16 +515,6 @@
}
}
- private void increaseCommits()
- {
- if (getStatisticsEnabled()) commits++;
- }
-
- private void increaseRollbacks()
- {
- if (getStatisticsEnabled()) rollbacks++;
- }
-
/**
* Handles a commit or a rollback. Called by the synch handler. Simply tests that we
are in the correct tx and
* passes the meth call up the interceptor chain.
@@ -760,38 +546,24 @@
// Transaction phase runners
// --------------------------------------------------------------
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List
modifications, boolean onePhaseCommit)
+ {
+ return commandsFactory.buildPrepareCommand(gtx, modifications,
rpcManager.getLocalAddress(), onePhaseCommit);
+ }
+
/**
- * creates a commit() MethodCall and feeds it to handleCommitRollback();
+ * creates a commit()
*/
- protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx,
Transaction tx, List modifications, List clModifications, boolean onePhaseCommit)
+ protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List
modifications, List clModifications, boolean onePhaseCommit)
{
// set the hasMods flag in the invocation ctx. This should not be replicated, just
used locally by the interceptors.
ctx.setTxHasMods(modifications != null && modifications.size() > 0);
ctx.setCacheLoaderHasMods(clModifications != null && clModifications.size()
> 0);
try
{
- VisitableCommand commitCommand;
- if (onePhaseCommit)
- {
- // running a 1-phase commit.
- if (configuration.isNodeLockingOptimistic())
- {
- commitCommand = commandsFactory.buildOptimisticPrepareCommand(gtx, null);
- }
- else
- {
- commitCommand = commandsFactory.buildPrepareCommand(gtx, modifications,
rpcManager.getLocalAddress(), true);
- }
- }
- else
- {
- commitCommand = commandsFactory.buildCommitCommand(gtx);
- }
+ VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx,
modifications, true) : commandsFactory.buildCommitCommand(gtx);
- if (trace)
- {
- log.trace(" running commit for " + gtx);
- }
+ if (trace) log.trace("Running commit for " + gtx);
handleCommitRollback(ctx, commitCommand);
}
@@ -819,38 +591,27 @@
}
}
-
- private void cleanupStaleLocks(InvocationContext ctx) throws Throwable
+ protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
{
TransactionEntry entry = ctx.getTransactionEntry();
- if (entry != null)
- {
- entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
- }
+ if (entry != null) entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
}
/**
- * creates a rollback() MethodCall and feeds it to handleCommitRollback();
- *
- * @param gtx
+ * creates a rollback()
*/
protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx,
Transaction tx, List<ReversibleCommand> modifications)
{
- //Transaction ltx = null;
try
{
ctx.setTxHasMods(modifications != null && modifications.size() > 0);
// JBCACHE-457
VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
- if (trace)
- {
- log.trace(" running rollback for " + gtx);
- }
+ if (trace) log.trace(" running rollback for " + gtx);
//JBCACHE-359 Store a lookup for the globalTransaction so a listener
// callback can find it
- //ltx = getLocalTxForGlobalTx(globalTransaction);
rollbackTransactions.put(tx, gtx);
handleCommitRollback(ctx, rollbackCommand);
@@ -861,7 +622,7 @@
}
finally
{
- if (tx != null) rollbackTransactions.remove(tx);
+ rollbackTransactions.remove(tx);
}
}
@@ -884,48 +645,35 @@
return newList;
}
+ private boolean isOnePhaseCommit()
+ {
+ if (!configuration.getCacheMode().isSynchronous())
+ {
+ // this is a REPL_ASYNC call - do 1-phase commit. break!
+ if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do
nothing for beforeCompletion()");
+ return true;
+ }
+ return false;
+ }
+
/**
* Handles a local prepare - invoked by the sync handler. Tests if the current tx
matches the gtx passed in to the
* method call and passes the prepare() call up the chain.
- *
- * @return
- * @throws Throwable
*/
@SuppressWarnings("deprecation")
public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx,
List<ReversibleCommand> modifications) throws Throwable
{
- // build the method call
- VisitableCommand prepareCommand;
- // if (cache.getCacheModeInternal() != CacheImpl.REPL_ASYNC)
- // {
// running a 2-phase commit.
- if (configuration.isNodeLockingOptimistic())
- {
- prepareCommand = commandsFactory.buildOptimisticPrepareCommand(gtx,
modifications, null, rpcManager.getLocalAddress(), false);
- }
- else if (configuration.getCacheMode() != Configuration.CacheMode.REPL_ASYNC)
- {
- prepareCommand = commandsFactory.buildPrepareCommand(gtx, modifications,
rpcManager.getLocalAddress(),
- false);// don't commit or rollback - wait for call
- }
- //}
- else
- {
- // this is a REPL_ASYNC call - do 1-phase commit. break!
- if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do
nothing for beforeCompletion()");
- return null;
- }
+ VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
- // passes a prepare call up the local interceptor chain. The replication
interceptor
- // will do the broadcasting if needed. This is so all requests (local/remote) are
- // treated the same
Object result;
// Is there a local transaction associated with GTX ?
Transaction ltx = ctx.getTransaction();
//if ltx is not null and it is already running
- if (txManager.getTransaction() != null && ltx != null &&
txManager.getTransaction().equals(ltx))
+ Transaction currentTransaction = txManager.getTransaction();
+ if (currentTransaction != null && ltx != null &&
currentTransaction.equals(ltx))
{
VisitableCommand originalCommand = ctx.getCommand();
ctx.setCommand(prepareCommand);
@@ -953,6 +701,20 @@
// Private helper methods
// --------------------------------------------------------------
+ protected void assertTxIsStillValid(Transaction tx)
+ {
+ if (!TransactionTable.isActive(tx))
+ {
+ try
+ {
+ throw new ReplicationException("prepare() failed -- local transaction
status is not STATUS_ACTIVE; is " + tx.getStatus());
+ }
+ catch (SystemException e)
+ {
+ throw new ReplicationException("prepare() failed -- local transaction
status is not STATUS_ACTIVE; Unable to retrieve transaction status.");
+ }
+ }
+ }
/**
* Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
@@ -964,41 +726,42 @@
private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx)
throws Exception
{
GlobalTransaction gtx;
- if (TransactionTable.isValid(tx) && transactions.put(tx, NULL) == null)
+
+ if (TransactionTable.isValid(tx) && transactions.add(tx))
{
gtx = txTable.getCurrentTransaction(tx, true);
+ TransactionEntry entry;
if (ctx.getGlobalTransaction() == null)
{
ctx.setGlobalTransaction(gtx);
- ctx.setTransactionEntry(txTable.get(gtx));
+ entry = txTable.get(gtx);
+ ctx.setTransactionEntry(entry);
}
+ else
+ {
+ entry = ctx.getTransactionEntry();
+ }
if (gtx.isRemote())
{
// should be no need to register a handler since this a remotely initiated
globalTransaction
- if (trace)
- {
- log.trace("is a remotely initiated gtx so no need to register a tx
for it");
- }
+ if (trace) log.trace("is a remotely initiated gtx so no need to register
a tx for it");
}
else
{
- if (trace)
- {
- log.trace("Registering sync handler for tx " + tx + ", gtx
" + gtx);
- }
+ if (trace) log.trace("Registering sync handler for tx " + tx +
", gtx " + gtx);
+
// see the comment in the LocalSyncHandler for the last isOriginLocal param.
- LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx,
tx, !ctx.isOriginLocal());
+ LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx,
tx, entry, !ctx.isOriginLocal());
registerHandler(tx, myHandler, ctx);
}
}
- else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
+ else if ((gtx = rollbackTransactions.get(tx)) != null)
{
if (trace) log.trace("Transaction " + tx + " is already
registered and is rolling back.");
}
else
{
if (trace) log.trace("Transaction " + tx + " is already
registered.");
-
}
return gtx;
}
@@ -1022,11 +785,11 @@
}
/**
- * Replaces the global transaction in a method call with a new global transaction
passed in.
+ * Replaces the global transaction in a VisitableCommand with a new global transaction
passed in.
*/
- private VisitableCommand replaceGtx(VisitableCommand m, final GlobalTransaction gtx)
throws Throwable
+ private VisitableCommand replaceGtx(VisitableCommand command, final GlobalTransaction
gtx) throws Throwable
{
- m.acceptVisitor(null, new AbstractVisitor()
+ command.acceptVisitor(null, new AbstractVisitor()
{
@Override
public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable
@@ -1091,7 +854,7 @@
return null;
}
});
- return m;
+ return command;
}
/**
@@ -1100,7 +863,7 @@
* @return
* @throws Exception
*/
- private Transaction createLocalTx() throws Exception
+ protected Transaction createLocalTx() throws Exception
{
if (trace)
{
@@ -1113,23 +876,6 @@
return localTx;
}
- /**
- * Creates a new local transaction for a given global transaction.
- *
- * @param gtx
- * @return
- * @throws Exception
- */
- private Transaction createLocalTxForGlobalTx(GlobalTransaction gtx, InvocationContext
ctx) throws Exception
- {
- Transaction localTx = createLocalTx();
- txTable.put(localTx, gtx);
- // attach this to the context
- ctx.setTransaction(localTx);
- if (trace) log.trace("Created new tx for gtx " + gtx);
- return localTx;
- }
-
// ------------------------------------------------------------------------
// Synchronization classes
// ------------------------------------------------------------------------
@@ -1144,26 +890,27 @@
TransactionEntry entry = null;
protected InvocationContext ctx; // the context for this call.
- RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx)
+ RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx,
TransactionEntry entry)
{
this.gtx = gtx;
this.tx = tx;
+ this.entry = entry;
}
public void beforeCompletion()
{
if (trace) log.trace("Running beforeCompletion on gtx " + gtx);
- entry = txTable.get(gtx);
+
if (entry == null)
{
log.error("Transaction has a null transaction entry - beforeCompletion()
will fail.");
- log.error("TxTable contents: " + txTable);
throw new IllegalStateException("cannot find transaction entry for
" + gtx);
}
modifications = entry.getModifications();
ctx = invocationContextContainer.get();
- ctx.setTransactionEntry(entry);
+ setTransactionalContext(tx, gtx, entry, ctx);
+
if (ctx.isOptionsUninitialised() && entry.getOption() != null)
ctx.setOptionOverrides(entry.getOption());
assertCanContinue();
@@ -1179,21 +926,18 @@
if (ctx == null)
{
ctx = invocationContextContainer.get();
- }
+ setTransactionalContext(tx, gtx, entry, ctx);
- entry = txTable.get(gtx);
- ctx.setTransactionEntry(entry);
-
- if (ctx.isOptionsUninitialised() && entry != null &&
entry.getOption() != null)
- {
- // use the options from the transaction entry instead
- ctx.setOptionOverrides(entry.getOption());
+ if (ctx.isOptionsUninitialised() && entry != null &&
entry.getOption() != null)
+ {
+ // use the options from the transaction entry instead
+ ctx.setOptionOverrides(entry.getOption());
+ }
}
try
{
assertCanContinue();
- setTransactionalContext(tx, gtx, ctx);
try
{
@@ -1220,11 +964,9 @@
switch (status)
{
case Status.STATUS_COMMITTED:
-
- // if this is optimistic or sync repl
- boolean onePhaseCommit = !configuration.isNodeLockingOptimistic()
&& configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
+ boolean onePhaseCommit =
!configuration.getCacheMode().isSynchronous();
if (log.isDebugEnabled()) log.debug("Running commit phase. One
phase? " + onePhaseCommit);
- runCommitPhase(ctx, gtx, tx, modifications, cacheLoaderModifications,
onePhaseCommit);
+ runCommitPhase(ctx, gtx, modifications, cacheLoaderModifications,
onePhaseCommit);
log.debug("Finished commit phase");
break;
case Status.STATUS_UNKNOWN:
@@ -1240,12 +982,17 @@
throw new IllegalStateException("illegal status: " +
status);
}
}
+ catch (Exception th)
+ {
+ log.trace("Caught exception ", th);
+
+ }
finally
{
// clean up the tx table
txTable.remove(gtx);
txTable.remove(tx);
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
cleanupInternalState();
}
}
@@ -1308,9 +1055,9 @@
* @param tx
* @param remoteLocal
*/
- LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, boolean
remoteLocal)
+ LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionEntry
entry, boolean remoteLocal)
{
- super(gtx, tx);
+ super(gtx, tx, entry);
this.remoteLocal = remoteLocal;
}
@@ -1321,7 +1068,7 @@
ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
// fetch the modifications before the transaction is committed
// (and thus removed from the txTable)
- setTransactionalContext(tx, gtx, ctx);
+ setTransactionalContext(tx, gtx, entry, ctx);
if (!entry.hasModifications())
{
if (trace) log.trace("No modifications in this tx. Skipping
beforeCompletion()");
@@ -1329,7 +1076,7 @@
return;
}
- // set any transaction wide options as current for this thread.
+ // set any transaction wide options as current for this thread, caching original
options that would then be reset
originalOptions = ctx.getOptionOverrides();
transactionalOptions = entry.getOption();
ctx.setOptionOverrides(transactionalOptions);
@@ -1343,8 +1090,9 @@
case Status.STATUS_PREPARING:
// run a prepare call.
modifications = compact(modifications);
- Object result = runPreparePhase(ctx, gtx, modifications);
+ Object result = isOnePhaseCommit() ? null : runPreparePhase(ctx, gtx,
modifications);
+
if (result instanceof Throwable)
{
if (log.isDebugEnabled())
@@ -1376,7 +1124,7 @@
finally
{
localRollbackOnly = false;
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
ctx.setOptionOverrides(originalOptions);
}
}
@@ -1387,9 +1135,8 @@
// could happen if a rollback is called and beforeCompletion() doesn't get
called.
if (ctx == null) ctx = invocationContextContainer.get();
ctx.setLocalRollbackOnly(localRollbackOnly);
+ setTransactionalContext(tx, gtx, entry, ctx);
ctx.setOptionOverrides(transactionalOptions);
- ctx.setTransaction(tx);
- ctx.setGlobalTransaction(gtx);
try
{
super.afterCompletion(status);
Modified:
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-12
20:45:48 UTC (rev 5833)
+++
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-13
14:10:18 UTC (rev 5834)
@@ -146,27 +146,18 @@
mgr.resume(tx);
- boolean fail = false;
- try
- {
- mgr.commit();
- }
- catch (Exception e)
- {
- fail = true;
+ // one-phase-commits wont throw an exception on failure.
+ mgr.commit();
- }
assertNull(mgr.getTransaction());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
- assertEquals(true, fail);
-
assertTrue(cache.exists(Fqn.fromString("/one/two")));
assertNotNull(cache.getNode("/one"));
assertEquals(false, cache.getRoot().getLock().isLocked());
- assertEquals(false, ((NodeSPI<Object, Object>)
cache.getNode("/one")).getLock().isLocked());
- assertEquals(false, ((NodeSPI<Object, Object>)
cache.getNode("/one/two")).getLock().isLocked());
+ assertEquals(false, cache.getNode("/one").getLock().isLocked());
+ assertEquals(false, cache.getNode("/one/two").getLock().isLocked());
assertNotNull(cache.getNode("/one").getChild("two"));
assertEquals(pojo2, cache.get(Fqn.fromString("/one/two"),
"key1"));
@@ -350,18 +341,9 @@
mgr.resume(tx);
- boolean fail = false;
- try
- {
- mgr.commit();
- }
- catch (Exception e)
- {
- fail = true;
+ // 1-pc commits wont throw an exception if there is a problem.
+ mgr.commit();
- }
-
- assertEquals(true, fail);
assertNull(mgr.getTransaction());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());