[jbosscache-commits] JBoss Cache SVN: r5852 - core/trunk/src/main/java/org/jboss/cache/interceptors.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu May 15 10:19:57 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-05-15 10:19:56 -0400 (Thu, 15 May 2008)
New Revision: 5852

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
Log:
Fixed local mode override issues

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-05-15 13:16:30 UTC (rev 5851)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-05-15 14:19:56 UTC (rev 5852)
@@ -186,4 +186,14 @@
       }
       return defaultSynchronous;
    }
+
+   protected boolean isLocalModeForced(InvocationContext ctx)
+   {
+      if (ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isCacheModeLocal())
+      {
+         if (log.isDebugEnabled()) log.debug("LOCAL mode forced on invocation.  Suppressing clustered events.");
+         return true;
+      }
+      return false;
+   }
 }
\ No newline at end of file

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2008-05-15 13:16:30 UTC (rev 5851)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2008-05-15 14:19:56 UTC (rev 5852)
@@ -77,12 +77,6 @@
       if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction, List<ReversibleCommand>>();
    }
 
-   private boolean skipInvalidation(InvocationContext ctx)
-   {
-      Option optionOverride = ctx.getOptionOverrides();
-      return optionOverride != null && optionOverride.isCacheModeLocal() && (ctx.getTransaction() == null);
-   }
-
    @Override
    public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
    {
@@ -141,7 +135,7 @@
                clone.removeModifications(entry.getLocalModifications());
                command = clone;
             }
-            broadcastInvalidate(command.getModifications(), gtx, tx, ctx);
+            broadcastInvalidate(command.getModifications(), tx, ctx);
          }
          else
          {
@@ -182,7 +176,7 @@
       {
          GlobalTransaction gtx = ctx.getGlobalTransaction();
          List<ReversibleCommand> modifications = txMods.remove(gtx);
-         broadcastInvalidate(modifications, gtx, tx, ctx);
+         broadcastInvalidate(modifications, tx, ctx);
          if (trace) log.trace("Committing.  Broadcasting invalidations.");
       }
       return retval;
@@ -228,13 +222,17 @@
             //replicate an evict call.
             for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null, isSynchronous(optionOverride), ctx);
          }
+         else
+         {
+            if (isLocalModeForced(ctx)) ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
+         }
       }
       return retval;
    }
 
-   private void broadcastInvalidate(List<ReversibleCommand> modifications, GlobalTransaction gtx, Transaction tx, InvocationContext ctx) throws Throwable
+   private void broadcastInvalidate(List<ReversibleCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable
    {
-      if (!skipInvalidation(ctx))
+      if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
       {
          if (modifications == null || modifications.isEmpty()) return;
          InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());
@@ -358,7 +356,7 @@
 
    protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace, boolean synchronous, InvocationContext ctx) throws Throwable
    {
-      if (!skipInvalidation(ctx))
+      if (ctx.getTransaction() != null && !isLocalModeForced(ctx))
       {
          // increment invalidations counter if statistics maintained
          incrementInvalidations();

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2008-05-15 13:16:30 UTC (rev 5851)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2008-05-15 14:19:56 UTC (rev 5852)
@@ -26,7 +26,6 @@
 import org.jboss.cache.commands.write.RemoveDataCommand;
 import org.jboss.cache.commands.write.RemoveKeyCommand;
 import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Option;
 import org.jboss.cache.factories.CommandsFactory;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.optimistic.DataVersion;
@@ -68,18 +67,6 @@
       this.commandsFactory = commandsFactory;
    }
 
-   private boolean isLocalOptionOverrides(InvocationContext ctx)
-   {
-      Option optionOverride = ctx.getOptionOverrides();
-      if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
-      {
-         // skip replication!!
-         log.debug("Skipping replication for this call as cache mode is local, forced via an option override.");
-         return true;
-      }
-      return false;
-   }
-
    @Override
    public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
    {
@@ -105,16 +92,11 @@
    @Override
    public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
    {
-      if (isLocalOptionOverrides(ctx))
-      {
-         return invokeNextInterceptor(ctx, command);
-      }
       //lets broadcast the commit first
       Throwable remoteCommitException = null;
       GlobalTransaction gtx = getGlobalTransaction(ctx);
       if (!gtx.isRemote() && ctx.isOriginLocal() && broadcastTxs.contains(gtx))
       {
-         //we dont do anything
          try
          {
             if (!skipReplicationOfTransactionMethod(ctx)) broadcastCommit(gtx, ctx);
@@ -137,10 +119,6 @@
    @Override
    public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
    {
-      if (isLocalOptionOverrides(ctx))
-      {
-         return invokeNextInterceptor(ctx, command);
-      }
       //    lets broadcast the rollback first
       GlobalTransaction gtx = getGlobalTransaction(ctx);
       Throwable remoteRollbackException = null;
@@ -169,15 +147,18 @@
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
    {
-      if (isLocalOptionOverrides(ctx))
-      {
-         return invokeNextInterceptor(ctx, command);
-      }
       if (command.isPutForExternalRead()) ctx.getTransactionEntry().setForceAsyncReplication(true);
+      if (isLocalModeForced(ctx)) ctx.getTransactionEntry().addLocalModification(command);
       return invokeNextInterceptor(ctx, command);
    }
 
+   public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+   {
+      if (isLocalModeForced(ctx)) ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
+      return invokeNextInterceptor(ctx, command);
+   }
 
+
    private GlobalTransaction getGlobalTransaction(InvocationContext ctx)
    {
       // get the current globalTransaction

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2008-05-15 13:16:30 UTC (rev 5851)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2008-05-15 14:19:56 UTC (rev 5852)
@@ -13,7 +13,6 @@
 import org.jboss.cache.commands.write.RemoveKeyCommand;
 import org.jboss.cache.commands.write.RemoveNodeCommand;
 import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.Option;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionEntry;
 
@@ -28,17 +27,6 @@
 public class ReplicationInterceptor extends BaseRpcInterceptor
 {
 
-   protected boolean skipReplication(InvocationContext ctx)
-   {
-      Option optionOverride = ctx.getOptionOverrides();
-      if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
-      {
-         if (trace) log.trace("skip replication");
-         return true;
-      }
-      return false;
-   }
-
    @Override
    public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
    {
@@ -76,7 +64,8 @@
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
    {
-      if (skipReplication(ctx)) return invokeNextInterceptor(ctx, command);
+      boolean local = isLocalModeForced(ctx);
+      if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
       if (isTransactionalAndLocal(ctx))
       {
          Object returnValue = invokeNextInterceptor(ctx, command);
@@ -85,11 +74,7 @@
             ctx.getTransactionEntry().setForceAsyncReplication(true);
          }
 
-         if (ctx.getOptionOverrides().isCacheModeLocal())
-         {
-            if (log.isDebugEnabled()) log.debug("Local mode override detected, will not replicate this command.");
-            ctx.getTransactionEntry().addLocalModification(command);
-         }
+         if (local) ctx.getTransactionEntry().addLocalModification(command);
 
          return returnValue;
       }
@@ -136,7 +121,8 @@
    private Object handleCrudMethod(InvocationContext ctx, VisitableCommand command, boolean forceAsync)
          throws Throwable
    {
-      if (skipReplication(ctx)) return invokeNextInterceptor(ctx, command);
+      boolean local = isLocalModeForced(ctx);
+      if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
       // FIRST pass this call up the chain.  Only if it succeeds (no exceptions) locally do we attempt to replicate.
       Object returnValue = invokeNextInterceptor(ctx, command);
       if (ctx.getTransaction() == null && ctx.isOriginLocal())
@@ -152,11 +138,7 @@
       }
       else
       {
-         if (ctx.getOptionOverrides().isCacheModeLocal())
-         {
-            if (log.isDebugEnabled()) log.debug("Local mode override detected, will not replicate this command.");
-            ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
-         }
+         if (local) ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
       }
       return returnValue;
    }




More information about the jbosscache-commits mailing list