[jbosscache-commits] JBoss Cache SVN: r4478 - core/trunk/src/main/java/org/jboss/cache/interceptors.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Sep 17 23:23:34 EDT 2007


Author: bstansberry at jboss.com
Date: 2007-09-17 23:23:34 -0400 (Mon, 17 Sep 2007)
New Revision: 4478

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
Log:
[JBCACHE-1175] Option to force call sync/async

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2007-09-18 03:22:49 UTC (rev 4477)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2007-09-18 03:23:34 UTC (rev 4478)
@@ -6,6 +6,8 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.marshall.MethodCall;
 import org.jboss.cache.marshall.MethodCallFactory;
 import org.jboss.cache.marshall.MethodDeclarations;
@@ -27,12 +29,15 @@
 
    private BuddyManager buddyManager;
    private boolean usingBuddyReplication;
+   protected boolean defaultSynchronous;
 
    public void setCache(CacheSPI cache)
    {
       super.setCache(cache);
       buddyManager = cache.getBuddyManager();
       usingBuddyReplication = buddyManager != null;
+      CacheMode mode = cache.getConfiguration().getCacheMode();
+      defaultSynchronous = (mode == CacheMode.REPL_SYNC || mode == CacheMode.INVALIDATION_SYNC);
    }
 
    /**
@@ -73,7 +78,11 @@
       {
          GlobalTransaction gtx = cache.getTransactionTable().get(tx);
          TransactionEntry te = cache.getTransactionTable().get(gtx);
-         if (te != null && te.isForceAsyncReplication()) sync = false;
+         if (te != null)
+         {
+            if (te.isForceAsyncReplication()) sync = false;
+            else if (te.isForceSyncReplication()) sync = true;
+         }
       }
       if (!sync && cache.getRPCManager().getReplicationQueue() != null && !usingBuddyReplication)
       {
@@ -127,4 +136,16 @@
             return false;
       }
    }
+
+   protected boolean isSynchronous(Option option)
+   {
+      if (option != null)
+      {
+         if (option.isForceSynchronous())
+            return true;
+         else if (option.isForceAsynchronous())
+            return false;
+      }
+      return defaultSynchronous;
+   }
 }
\ No newline at end of file

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2007-09-18 03:22:49 UTC (rev 4477)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2007-09-18 03:23:34 UTC (rev 4478)
@@ -81,7 +81,7 @@
                {
                   // the no-tx case:
                   //replicate an evict call.
-                  invalidateAcrossCluster(fqn, null);
+                  invalidateAcrossCluster(fqn, null, isSynchronous(optionOverride));
                }
             }
          }
@@ -117,7 +117,7 @@
                      {
                         try
                         {
-                           invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null);
+                           invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous);
                         }
                         catch (Throwable t)
                         {
@@ -175,8 +175,17 @@
       return retval;
    }
 
+   /**
+    * Same as <code>invalidateAcrossCluster(fqn, workspace, defaultSynchronous)</code>
+    * @deprecated use {@link #invalidateAcrossCluster(Fqn, TransactionWorkspace, boolean)
+    */
    protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace) throws Throwable
    {
+      invalidateAcrossCluster(fqn, workspace, defaultSynchronous);
+   }
+   
+   protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace, boolean synchronous) throws Throwable
+   {
       // increment invalidations counter if statistics maintained
       if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
          m_invalidations++;
@@ -188,14 +197,23 @@
 
       if (log.isDebugEnabled()) log.debug("Cache [" + cache.getLocalAddress() + "] replicating " + call);
       // voila, invalidated!
-      replicateCall(call, configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC);
+      replicateCall(call, synchronous);
    }
 
+   /**
+    * Same as <code>invalidateModifications(modifications, workspace, defaultSynchronous)</code>
+    * @deprecated use {@link #invalidateModifications(List<MethodCall>, TransactionWorkspace, boolean)
+    */
    protected void invalidateModifications(List<MethodCall> modifications, TransactionWorkspace workspace) throws Throwable
    {
+      invalidateModifications(modifications, workspace, defaultSynchronous);
+   }
+   
+   protected void invalidateModifications(List<MethodCall> modifications, TransactionWorkspace workspace, boolean synchronous) throws Throwable
+   {
       // optimise the calls list here.
       Set<Fqn> modifiedFqns = optimisedIterator(modifications);
-      for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace);
+      for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace, synchronous);
    }
 
    protected TransactionWorkspace getWorkspace(GlobalTransaction gtx)

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2007-09-18 03:22:49 UTC (rev 4477)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2007-09-18 03:23:34 UTC (rev 4478)
@@ -82,10 +82,11 @@
       {
          // NON-TRANSACTIONAL and CRUD method
          if (log.isTraceEnabled()) log.trace("Non-tx crud meth");
+         
+         // don't re-broadcast if we've received this from another cache in the cluster.
          if (ctx.isOriginLocal())
          {
-            // don't re-broadcast if we've received this from anotehr cache in the cluster.
-            handleReplicatedMethod(m, configuration.getCacheMode());
+            handleReplicatedMethod(m, isSynchronous(optionOverride));
          }
       }
       else
@@ -96,7 +97,7 @@
       return o;
    }
 
-   void handleReplicatedMethod(MethodCall m, Configuration.CacheMode mode) throws Throwable
+   void handleReplicatedMethod(MethodCall m, boolean synchronous) throws Throwable
    {
       if (log.isTraceEnabled())
       {
@@ -104,7 +105,7 @@
                  configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
                  configuration.getSyncReplTimeout());
       }
-      if (mode == Configuration.CacheMode.REPL_ASYNC || m.getMethodId() == MethodDeclarations.putForExternalReadMethodLocal_id)
+      if (!synchronous || m.getMethodId() == MethodDeclarations.putForExternalReadMethodLocal_id)
       {
          // 2. Replicate change to all *other* members (exclude self !)
          replicateCall(m, false);




More information about the jbosscache-commits mailing list