[jbosscache-commits] JBoss Cache SVN: r4503 - in core/trunk/src/main/java/org/jboss/cache: config and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Sep 25 09:41:09 EDT 2007


Author: manik.surtani at jboss.com
Date: 2007-09-25 09:41:09 -0400 (Tue, 25 Sep 2007)
New Revision: 4503

Modified:
   core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
   core/trunk/src/main/java/org/jboss/cache/config/Option.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
Log:
JBCACHE-549 - Allow overriding of GroupRequest mode and sync repl timeout via Options

Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2007-09-25 13:05:59 UTC (rev 4502)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2007-09-25 13:41:09 UTC (rev 4503)
@@ -1136,7 +1136,8 @@
             // Lock the parent, create and add the child
             try
             {
-               parent.getLock().acquire(owner, configuration.getSyncReplTimeout(), NodeLock.LockType.WRITE);
+               Option o = getInvocationContext().getOptionOverrides();
+               parent.getLock().acquire(owner, o.getSyncReplTimeout() < 0 ? configuration.getSyncReplTimeout() : o.getSyncReplTimeout(), NodeLock.LockType.WRITE);
             }
             catch (InterruptedException e)
             {
@@ -2053,6 +2054,11 @@
    public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int mode, boolean exclude_self, long timeout, RspFilter rspFilter)
            throws Exception
    {
+      int modeToUse = mode;
+      int preferredMode;
+      if ((preferredMode = getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
+         modeToUse = preferredMode;
+
       RspList rsps = null;
       Rsp rsp;
       List retval;
@@ -2101,8 +2107,8 @@
          flushBlockGate.await(configuration.getStateRetrievalTimeout());
       }
       rsps = rspFilter == null
-              ? disp.callRemoteMethods(validMembers, method_call, mode, timeout, buddyManager != null && buddyManager.isEnabled())
-              : disp.callRemoteMethods(validMembers, method_call, mode, timeout, buddyManager != null && buddyManager.isEnabled(), false, rspFilter);
+              ? disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled())
+              : disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled(), false, rspFilter);
 
       // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
       // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193

Modified: core/trunk/src/main/java/org/jboss/cache/config/Option.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Option.java	2007-09-25 13:05:59 UTC (rev 4502)
+++ core/trunk/src/main/java/org/jboss/cache/config/Option.java	2007-09-25 13:41:09 UTC (rev 4503)
@@ -25,10 +25,13 @@
 
    private boolean forceWriteLock;
    private boolean skipCacheStatusCheck;
-   
+
    private boolean forceAsynchronous;
    private boolean forceSynchronous;
-   
+
+   private long syncReplTimeout = -1;
+   private int groupRequestMode = -1;
+
    private int lockAcquisitionTimeout = -1;
 
    /**
@@ -153,9 +156,9 @@
     * Gets whether replication or invalidation should be done asynchronously,
     * even if the cache is configured in a synchronous mode.  Has no
     * effect if the call is occuring within a transactional context.
-    * 
+    *
     * @return <code>true</code> if replication/invalidation should be done
-    *         asynchronously; <code>false</code> if the default mode 
+    *         asynchronously; <code>false</code> if the default mode
     *         configured for the cache should be used.
     */
    public boolean isForceAsynchronous()
@@ -167,10 +170,10 @@
     * Sets whether replication or invalidation should be done asynchronously,
     * even if the cache is configured in a synchronous mode.  Has no
     * effect if the call is occuring within a transactional context.
-    * 
-    * @param forceAsynchronous <code>true</code> if replication/invalidation 
+    *
+    * @param forceAsynchronous <code>true</code> if replication/invalidation
     *                          should be done asynchronously; <code>false</code>
-    *                          if the default mode configured for the cache 
+    *                          if the default mode configured for the cache
     *                          should be used.
     */
    public void setForceAsynchronous(boolean forceAsynchronous)
@@ -182,9 +185,9 @@
     * Gets whether replication or invalidation should be done synchronously,
     * even if the cache is configured in an asynchronous mode.  Has no
     * effect if the call is occuring within a transactional context.
-    * 
+    *
     * @return <code>true</code> if replication/invalidation should be done
-    *         synchronously; <code>false</code> if the default mode 
+    *         synchronously; <code>false</code> if the default mode
     *         configured for the cache should be used.
     */
    public boolean isForceSynchronous()
@@ -196,10 +199,10 @@
     * Sets whether replication or invalidation should be done synchronously,
     * even if the cache is configured in an asynchronous mode.  Has no
     * effect if the call is occuring within a transactional context.
-    * 
-    * @param forceAsynchronous <code>true</code> if replication/invalidation 
+    *
+    * @param forceAsynchronous <code>true</code> if replication/invalidation
     *                          should be done synchronously; <code>false</code>
-    *                          if the default mode configured for the cache 
+    *                          if the default mode configured for the cache
     *                          should be used.
     */
    public void setForceSynchronous(boolean forceSynchronous)
@@ -209,7 +212,7 @@
 
    /**
     * Gets any lock acquisition timeout configured for the call.
-    * 
+    *
     * @return the time in ms that lock acquisition attempts should block
     *         before failing with a TimeoutException.  A value < 0 indicates
     *         that the cache's default timeout should be used.
@@ -221,9 +224,9 @@
 
    /**
     * Sets any lock acquisition timeout configured for the call.
-    * 
-    * @param lockAcquisitionTimeout the time in ms that lock acquisition 
-    *                               attempts should block before failing with a 
+    *
+    * @param lockAcquisitionTimeout the time in ms that lock acquisition
+    *                               attempts should block before failing with a
     *                               TimeoutException.  A value < 0 indicates
     *                               that the cache's default timeout should be used.
     */
@@ -346,4 +349,50 @@
    {
       return skipCacheStatusCheck;
    }
+
+   /**
+    * @return the value of the sync replication timeout (used when cache mode is either {@link org.jboss.cache.config.Configuration.CacheMode#REPL_SYNC}
+    *         or {@link org.jboss.cache.config.Configuration.CacheMode#INVALIDATION_SYNC}) to be used for this specific call, or -1 (default) if the
+    *         default value in {@link Configuration#getSyncReplTimeout()} is to be used instead.
+    * @since 2.1.0
+    */
+   public long getSyncReplTimeout()
+   {
+      return syncReplTimeout;
+   }
+
+   /**
+    * Used to override the value in {@link Configuration#getSyncReplTimeout()} (used when cache mode is either {@link org.jboss.cache.config.Configuration.CacheMode#REPL_SYNC}
+    * or {@link org.jboss.cache.config.Configuration.CacheMode#INVALIDATION_SYNC}) for this specific invocation.  Defaults to -1,
+    * which means use the default in the configuration.
+    *
+    * @param syncReplTimeout new timeout value for this invocation.
+    * @since 2.1.0
+    */
+   public void setSyncReplTimeout(long syncReplTimeout)
+   {
+      this.syncReplTimeout = syncReplTimeout;
+   }
+
+   /**
+    * @return overridden JGroups {@link org.jgroups.blocks.GroupRequest} mode to use, or -1 if the {@link org.jboss.cache.RPCManager}'s
+    *         own logic is to be used to select a group request mode (this is the default).
+    * @since 2.1.0
+    */
+   public int getGroupRequestMode()
+   {
+      return groupRequestMode;
+   }
+
+   /**
+    * By default, the {@link org.jboss.cache.RPCManager} has inbuilt logic when it comes to selecting a group request mode.
+    * This can be overridden by setting the group request mode here, using this method, for a specific invocation.
+    *
+    * @param groupRequestMode a group request mode, found in the {@link org.jgroups.blocks.GroupRequest} class.
+    * @since 2.1.0
+    */
+   public void setGroupRequestMode(int groupRequestMode)
+   {
+      this.groupRequestMode = groupRequestMode;
+   }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2007-09-25 13:05:59 UTC (rev 4502)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2007-09-25 13:41:09 UTC (rev 4503)
@@ -6,8 +6,8 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.config.Option;
-import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.marshall.MethodCall;
 import org.jboss.cache.marshall.MethodCallFactory;
 import org.jboss.cache.marshall.MethodDeclarations;
@@ -64,12 +64,12 @@
       }
    }
 
-   protected void replicateCall(MethodCall call, boolean sync) throws Throwable
+   protected void replicateCall(MethodCall call, boolean sync, Option o) throws Throwable
    {
-      replicateCall(null, call, sync);
+      replicateCall(null, call, sync, o);
    }
 
-   protected void replicateCall(List<Address> recipients, MethodCall call, boolean sync) throws Throwable
+   protected void replicateCall(List<Address> recipients, MethodCall call, boolean sync, Option o) throws Throwable
    {
 
       if (log.isTraceEnabled()) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
@@ -100,12 +100,15 @@
                log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
          }
 
+         long syncReplTimeout = o.getSyncReplTimeout();
+         if (syncReplTimeout < 0) syncReplTimeout = configuration.getSyncReplTimeout();
+
          List rsps = cache.getRPCManager().callRemoteMethods(callRecipients,
                  MethodDeclarations.replicateMethod,
                  new Object[]{call},
                  sync, // is synchronised?
                  true, // ignore self?
-                 configuration.getSyncReplTimeout());
+                 syncReplTimeout);
          if (log.isTraceEnabled())
          {
             log.trace("responses=" + rsps);

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2007-09-25 13:05:59 UTC (rev 4502)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2007-09-25 13:41:09 UTC (rev 4503)
@@ -117,7 +117,7 @@
                      //ctx.setMethodCall(m);
 
                      // Clean up the other nodes
-                     cleanBackupData(data, ctx.getGlobalTransaction());
+                     cleanBackupData(data, ctx.getGlobalTransaction(), ctx);
                   }
                }
             }
@@ -140,13 +140,13 @@
                case MethodDeclarations.prepareMethod_id:
                case MethodDeclarations.optimisticPrepareMethod_id:
                   Object o = super.invoke(ctx);
-                  doPrepare(ctx.getGlobalTransaction());
+                  doPrepare(ctx.getGlobalTransaction(), ctx);
                   return o;
                case MethodDeclarations.rollbackMethod_id:
                   transactionMods.remove(ctx.getGlobalTransaction());
                   return super.invoke(ctx);
                case MethodDeclarations.commitMethod_id:
-                  doCommit(ctx.getGlobalTransaction());
+                  doCommit(ctx.getGlobalTransaction(), ctx);
                   transactionMods.remove(ctx.getGlobalTransaction());
                   return super.invoke(ctx);
             }
@@ -180,7 +180,7 @@
       return enabled;
    }
 
-   private void doPrepare(GlobalTransaction gtx) throws Throwable
+   private void doPrepare(GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
       MethodCall cleanup = (MethodCall) transactionMods.get(gtx);
       if (log.isTraceEnabled()) log.trace("Broadcasting prepare for cleanup ops " + cleanup);
@@ -198,7 +198,7 @@
             prepare = MethodCallFactory.create(MethodDeclarations.prepareMethod, gtx, mods, cache.getLocalAddress(), syncCommunications);
          }
 
-         replicateCall(getMembersOutsideBuddyGroup(), prepare, syncCommunications);
+         replicateCall(getMembersOutsideBuddyGroup(), prepare, syncCommunications, ctx.getOptionOverrides());
       }
       else
       {
@@ -206,12 +206,12 @@
       }
    }
 
-   private void doCommit(GlobalTransaction gtx) throws Throwable
+   private void doCommit(GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
       if (transactionMods.containsKey(gtx))
       {
          if (log.isTraceEnabled()) log.trace("Broadcasting commit for gtx " + gtx);
-         replicateCall(getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod, gtx), syncCommunications);
+         replicateCall(getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod, gtx), syncCommunications, ctx.getOptionOverrides());
       }
       else
       {
@@ -269,7 +269,7 @@
       return result;
    }
 
-   private void cleanBackupData(BackupData backup, GlobalTransaction gtx) throws Throwable
+   private void cleanBackupData(BackupData backup, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
       //       MethodCall primaryDataCleanup, backupDataCleanup;
       //       if (buddyManager.isDataGravitationRemoveOnFind())
@@ -296,7 +296,7 @@
          if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.backupFqn + "]");
          // remove backup Fqn
          //replicateCall(cache.getMembers(), backupDataCleanup, syncCommunications);
-         replicateCall(cache.getMembers(), cleanup, syncCommunications);
+         replicateCall(cache.getMembers(), cleanup, syncCommunications, ctx.getOptionOverrides());
       }
       else
       {

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2007-09-25 13:05:59 UTC (rev 4502)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2007-09-25 13:41:09 UTC (rev 4503)
@@ -9,7 +9,6 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InvocationContext;
-import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Option;
 import org.jboss.cache.marshall.MethodCall;
 import org.jboss.cache.marshall.MethodCallFactory;
@@ -81,7 +80,7 @@
                {
                   // the no-tx case:
                   //replicate an evict call.
-                  invalidateAcrossCluster(fqn, null, isSynchronous(optionOverride));
+                  invalidateAcrossCluster(fqn, null, isSynchronous(optionOverride), ctx);
                }
             }
          }
@@ -117,7 +116,7 @@
                      {
                         try
                         {
-                           invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous);
+                           invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous, ctx);
                         }
                         catch (Throwable t)
                         {
@@ -177,14 +176,15 @@
 
    /**
     * Same as <code>invalidateAcrossCluster(fqn, workspace, defaultSynchronous)</code>
-    * @deprecated use {@link #invalidateAcrossCluster(Fqn, TransactionWorkspace, boolean)
+    *
+    * @deprecated use {@link #invalidateAcrossCluster(Fqn,TransactionWorkspace,boolean,InvocationContext)
     */
-   protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace) throws Throwable
+   protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace, InvocationContext ctx) throws Throwable
    {
-      invalidateAcrossCluster(fqn, workspace, defaultSynchronous);
+      invalidateAcrossCluster(fqn, workspace, defaultSynchronous, ctx);
    }
-   
-   protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace, boolean synchronous) throws Throwable
+
+   protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace, boolean synchronous, InvocationContext ctx) throws Throwable
    {
       // increment invalidations counter if statistics maintained
       if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
@@ -200,23 +200,24 @@
 
       if (log.isDebugEnabled()) log.debug("Cache [" + cache.getLocalAddress() + "] replicating " + call);
       // voila, invalidated!
-      replicateCall(call, synchronous);
+      replicateCall(call, synchronous, ctx.getOptionOverrides());
    }
 
    /**
     * Same as <code>invalidateModifications(modifications, workspace, defaultSynchronous)</code>
-    * @deprecated use {@link #invalidateAcrossCluster(org.jboss.cache.Fqn, org.jboss.cache.optimistic.TransactionWorkspace, boolean)}
+    *
+    * @deprecated use {@link #invalidateAcrossCluster(org.jboss.cache.Fqn,org.jboss.cache.optimistic.TransactionWorkspace,boolean,InvocationContext)}
     */
-   protected void invalidateModifications(List<MethodCall> modifications, TransactionWorkspace workspace) throws Throwable
+   protected void invalidateModifications(List<MethodCall> modifications, TransactionWorkspace workspace, InvocationContext ctx) throws Throwable
    {
-      invalidateModifications(modifications, workspace, defaultSynchronous);
+      invalidateModifications(modifications, workspace, defaultSynchronous, ctx);
    }
-   
-   protected void invalidateModifications(List<MethodCall> modifications, TransactionWorkspace workspace, boolean synchronous) throws Throwable
+
+   protected void invalidateModifications(List<MethodCall> modifications, TransactionWorkspace workspace, boolean synchronous, InvocationContext ctx) throws Throwable
    {
       // optimise the calls list here.
       Set<Fqn> modifiedFqns = optimisedIterator(modifications);
-      for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace, synchronous);
+      for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace, synchronous, ctx);
    }
 
    protected TransactionWorkspace getWorkspace(GlobalTransaction gtx)

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2007-09-25 13:05:59 UTC (rev 4502)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2007-09-25 13:41:09 UTC (rev 4503)
@@ -76,7 +76,7 @@
             if (!gtx.isRemote() && ctx.isOriginLocal())
             {
                // replicate the prepare call.
-               broadcastPrepare(m, gtx);
+               broadcastPrepare(m, gtx, ctx);
             }
             break;
          case MethodDeclarations.commitMethod_id:
@@ -88,7 +88,7 @@
                //we dont do anything
                try
                {
-                  broadcastCommit(gtx);
+                  broadcastCommit(gtx, ctx);
                }
                catch (Throwable t)
                {
@@ -112,7 +112,7 @@
                //we dont do anything
                try
                {
-                  broadcastRollback(gtx);
+                  broadcastRollback(gtx, ctx);
                }
                catch (Throwable t)
                {
@@ -151,7 +151,7 @@
       return gtx;
    }
 
-   protected void broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx) throws Throwable
+   protected void broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
       boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
 
@@ -175,7 +175,7 @@
                     + " (" + num_mods + " modifications");
          }
 
-         replicateCall(toBroadcast, remoteCallSync);
+         replicateCall(toBroadcast, remoteCallSync, ctx.getOptionOverrides());
       }
       else
       {
@@ -189,7 +189,7 @@
    }
 
 
-   protected void broadcastCommit(GlobalTransaction gtx) throws Throwable
+   protected void broadcastCommit(GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
       boolean remoteCallSync = configuration.isSyncCommitPhase();
 
@@ -204,7 +204,7 @@
             if (log.isDebugEnabled())
                log.debug("running remote commit for " + gtx + " and coord=" + cache.getLocalAddress());
 
-            replicateCall(commit_method, remoteCallSync);
+            replicateCall(commit_method, remoteCallSync, ctx.getOptionOverrides());
          }
          catch (Exception e)
          {
@@ -214,7 +214,7 @@
       }
    }
 
-   protected void broadcastRollback(GlobalTransaction gtx) throws Throwable
+   protected void broadcastRollback(GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
       boolean remoteCallSync = configuration.isSyncRollbackPhase();
 
@@ -228,7 +228,7 @@
 
             if (log.isDebugEnabled())
                log.debug("running remote rollback for " + gtx + " and coord=" + cache.getLocalAddress());
-            replicateCall(rollback_method, remoteCallSync);
+            replicateCall(rollback_method, remoteCallSync, ctx.getOptionOverrides());
          }
          catch (Exception e)
          {

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2007-09-25 13:05:59 UTC (rev 4502)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2007-09-25 13:41:09 UTC (rev 4503)
@@ -53,7 +53,8 @@
             {
                case MethodDeclarations.commitMethod_id:
                   // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
-                  if (containsModifications(ctx)) replicateCall(m, configuration.isSyncCommitPhase());
+                  if (containsModifications(ctx))
+                     replicateCall(m, configuration.isSyncCommitPhase(), ctx.getOptionOverrides());
                   // now pass up the chain
                   o = super.invoke(ctx);
                   break;
@@ -61,14 +62,14 @@
                   if (containsModifications(ctx))
                   {
                      // this is a prepare method
-                     runPreparePhase(m, gtx);
+                     runPreparePhase(m, gtx, ctx);
                   }
                   break;
                case MethodDeclarations.rollbackMethod_id:
                   // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
                   if (containsModifications(ctx) && !ctx.isLocalRollbackOnly())
                   {
-                     replicateCall(m, configuration.isSyncRollbackPhase());
+                     replicateCall(m, configuration.isSyncRollbackPhase(), ctx.getOptionOverrides());
                   }
                   // now pass up the chain
                   o = super.invoke(ctx);
@@ -82,11 +83,11 @@
       {
          // NON-TRANSACTIONAL and CRUD method
          if (log.isTraceEnabled()) log.trace("Non-tx crud meth");
-         
+
          // don't re-broadcast if we've received this from another cache in the cluster.
          if (ctx.isOriginLocal())
          {
-            handleReplicatedMethod(m, isSynchronous(optionOverride));
+            handleReplicatedMethod(m, isSynchronous(optionOverride), ctx);
          }
       }
       else
@@ -97,7 +98,7 @@
       return o;
    }
 
-   void handleReplicatedMethod(MethodCall m, boolean synchronous) throws Throwable
+   void handleReplicatedMethod(MethodCall m, boolean synchronous, InvocationContext ctx) throws Throwable
    {
       if (log.isTraceEnabled())
       {
@@ -108,13 +109,13 @@
       if (!synchronous || m.getMethodId() == MethodDeclarations.putForExternalReadMethodLocal_id)
       {
          // 2. Replicate change to all *other* members (exclude self !)
-         replicateCall(m, false);
+         replicateCall(m, false, ctx.getOptionOverrides());
       }
       else
       {
          // REVISIT Needs to exclude itself and apply the local change manually.
          // This is needed such that transient field is modified properly in-VM.
-         replicateCall(m, true);
+         replicateCall(m, true, ctx.getOptionOverrides());
       }
    }
 
@@ -132,7 +133,7 @@
     *
     * @throws Exception
     */
-   protected void runPreparePhase(MethodCall prepareMethod, GlobalTransaction gtx) throws Throwable
+   protected void runPreparePhase(MethodCall prepareMethod, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
    {
       boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
       if (log.isTraceEnabled())
@@ -141,6 +142,6 @@
       }
 
       // this method will return immediately if we're the only member (because exclude_self=true)
-      replicateCall(prepareMethod, !async);
+      replicateCall(prepareMethod, !async, ctx.getOptionOverrides());
    }
 }




More information about the jbosscache-commits mailing list