Author: manik.surtani(a)jboss.com
Date: 2008-05-15 10:19:56 -0400 (Thu, 15 May 2008)
New Revision: 5852
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
Log:
Fixed local mode override issues
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-05-15
13:16:30 UTC (rev 5851)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-05-15
14:19:56 UTC (rev 5852)
@@ -186,4 +186,14 @@
}
return defaultSynchronous;
}
+
+ protected boolean isLocalModeForced(InvocationContext ctx)
+ {
+ if (ctx.getOptionOverrides() != null &&
ctx.getOptionOverrides().isCacheModeLocal())
+ {
+ if (log.isDebugEnabled()) log.debug("LOCAL mode forced on invocation.
Suppressing clustered events.");
+ return true;
+ }
+ return false;
+ }
}
\ No newline at end of file
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-05-15
13:16:30 UTC (rev 5851)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-05-15
14:19:56 UTC (rev 5852)
@@ -77,12 +77,6 @@
if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction,
List<ReversibleCommand>>();
}
- private boolean skipInvalidation(InvocationContext ctx)
- {
- Option optionOverride = ctx.getOptionOverrides();
- return optionOverride != null && optionOverride.isCacheModeLocal()
&& (ctx.getTransaction() == null);
- }
-
@Override
public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command)
throws Throwable
{
@@ -141,7 +135,7 @@
clone.removeModifications(entry.getLocalModifications());
command = clone;
}
- broadcastInvalidate(command.getModifications(), gtx, tx, ctx);
+ broadcastInvalidate(command.getModifications(), tx, ctx);
}
else
{
@@ -182,7 +176,7 @@
{
GlobalTransaction gtx = ctx.getGlobalTransaction();
List<ReversibleCommand> modifications = txMods.remove(gtx);
- broadcastInvalidate(modifications, gtx, tx, ctx);
+ broadcastInvalidate(modifications, tx, ctx);
if (trace) log.trace("Committing. Broadcasting invalidations.");
}
return retval;
@@ -228,13 +222,17 @@
//replicate an evict call.
for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null,
isSynchronous(optionOverride), ctx);
}
+ else
+ {
+ if (isLocalModeForced(ctx))
ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
+ }
}
return retval;
}
- private void broadcastInvalidate(List<ReversibleCommand> modifications,
GlobalTransaction gtx, Transaction tx, InvocationContext ctx) throws Throwable
+ private void broadcastInvalidate(List<ReversibleCommand> modifications,
Transaction tx, InvocationContext ctx) throws Throwable
{
- if (!skipInvalidation(ctx))
+ if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
{
if (modifications == null || modifications.isEmpty()) return;
InvalidationFilterVisitor filterVisitor = new
InvalidationFilterVisitor(modifications.size());
@@ -358,7 +356,7 @@
protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace,
boolean synchronous, InvocationContext ctx) throws Throwable
{
- if (!skipInvalidation(ctx))
+ if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
{
// increment invalidations counter if statistics maintained
incrementInvalidations();
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-15
13:16:30 UTC (rev 5851)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-15
14:19:56 UTC (rev 5852)
@@ -26,7 +26,6 @@
import org.jboss.cache.commands.write.RemoveDataCommand;
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Option;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.optimistic.DataVersion;
@@ -68,18 +67,6 @@
this.commandsFactory = commandsFactory;
}
- private boolean isLocalOptionOverrides(InvocationContext ctx)
- {
- Option optionOverride = ctx.getOptionOverrides();
- if (optionOverride != null && optionOverride.isCacheModeLocal() &&
ctx.getTransaction() == null)
- {
- // skip replication!!
- log.debug("Skipping replication for this call as cache mode is local,
forced via an option override.");
- return true;
- }
- return false;
- }
-
@Override
public Object visitOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable
{
@@ -105,16 +92,11 @@
@Override
public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
{
- if (isLocalOptionOverrides(ctx))
- {
- return invokeNextInterceptor(ctx, command);
- }
//lets broadcast the commit first
Throwable remoteCommitException = null;
GlobalTransaction gtx = getGlobalTransaction(ctx);
if (!gtx.isRemote() && ctx.isOriginLocal() &&
broadcastTxs.contains(gtx))
{
- //we dont do anything
try
{
if (!skipReplicationOfTransactionMethod(ctx)) broadcastCommit(gtx, ctx);
@@ -137,10 +119,6 @@
@Override
public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable
{
- if (isLocalOptionOverrides(ctx))
- {
- return invokeNextInterceptor(ctx, command);
- }
// lets broadcast the rollback first
GlobalTransaction gtx = getGlobalTransaction(ctx);
Throwable remoteRollbackException = null;
@@ -169,15 +147,18 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
{
- if (isLocalOptionOverrides(ctx))
- {
- return invokeNextInterceptor(ctx, command);
- }
if (command.isPutForExternalRead())
ctx.getTransactionEntry().setForceAsyncReplication(true);
+ if (isLocalModeForced(ctx))
ctx.getTransactionEntry().addLocalModification(command);
return invokeNextInterceptor(ctx, command);
}
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
+ {
+ if (isLocalModeForced(ctx))
ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
+ return invokeNextInterceptor(ctx, command);
+ }
+
private GlobalTransaction getGlobalTransaction(InvocationContext ctx)
{
// get the current globalTransaction
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2008-05-15
13:16:30 UTC (rev 5851)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2008-05-15
14:19:56 UTC (rev 5852)
@@ -13,7 +13,6 @@
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.Option;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
@@ -28,17 +27,6 @@
public class ReplicationInterceptor extends BaseRpcInterceptor
{
- protected boolean skipReplication(InvocationContext ctx)
- {
- Option optionOverride = ctx.getOptionOverrides();
- if (optionOverride != null && optionOverride.isCacheModeLocal() &&
ctx.getTransaction() == null)
- {
- if (trace) log.trace("skip replication");
- return true;
- }
- return false;
- }
-
@Override
public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
{
@@ -76,7 +64,8 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
{
- if (skipReplication(ctx)) return invokeNextInterceptor(ctx, command);
+ boolean local = isLocalModeForced(ctx);
+ if (local && ctx.getTransaction() == null) return
invokeNextInterceptor(ctx, command);
if (isTransactionalAndLocal(ctx))
{
Object returnValue = invokeNextInterceptor(ctx, command);
@@ -85,11 +74,7 @@
ctx.getTransactionEntry().setForceAsyncReplication(true);
}
- if (ctx.getOptionOverrides().isCacheModeLocal())
- {
- if (log.isDebugEnabled()) log.debug("Local mode override detected, will
not replicate this command.");
- ctx.getTransactionEntry().addLocalModification(command);
- }
+ if (local) ctx.getTransactionEntry().addLocalModification(command);
return returnValue;
}
@@ -136,7 +121,8 @@
private Object handleCrudMethod(InvocationContext ctx, VisitableCommand command,
boolean forceAsync)
throws Throwable
{
- if (skipReplication(ctx)) return invokeNextInterceptor(ctx, command);
+ boolean local = isLocalModeForced(ctx);
+ if (local && ctx.getTransaction() == null) return
invokeNextInterceptor(ctx, command);
// FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally
do we attempt to replicate.
Object returnValue = invokeNextInterceptor(ctx, command);
if (ctx.getTransaction() == null && ctx.isOriginLocal())
@@ -152,11 +138,7 @@
}
else
{
- if (ctx.getOptionOverrides().isCacheModeLocal())
- {
- if (log.isDebugEnabled()) log.debug("Local mode override detected, will
not replicate this command.");
- ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
- }
+ if (local) ctx.getTransactionEntry().addLocalModification((ReversibleCommand)
command);
}
return returnValue;
}