Author: bstansberry(a)jboss.com
Date: 2007-09-17 23:23:34 -0400 (Mon, 17 Sep 2007)
New Revision: 4478
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
Log:
[JBCACHE-1175] Option to force call sync/async
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2007-09-18
03:22:49 UTC (rev 4477)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2007-09-18
03:23:34 UTC (rev 4478)
@@ -6,6 +6,8 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
@@ -27,12 +29,15 @@
private BuddyManager buddyManager;
private boolean usingBuddyReplication;
+ protected boolean defaultSynchronous;
public void setCache(CacheSPI cache)
{
super.setCache(cache);
buddyManager = cache.getBuddyManager();
usingBuddyReplication = buddyManager != null;
+ CacheMode mode = cache.getConfiguration().getCacheMode();
+ defaultSynchronous = (mode == CacheMode.REPL_SYNC || mode ==
CacheMode.INVALIDATION_SYNC);
}
/**
@@ -73,7 +78,11 @@
{
GlobalTransaction gtx = cache.getTransactionTable().get(tx);
TransactionEntry te = cache.getTransactionTable().get(gtx);
- if (te != null && te.isForceAsyncReplication()) sync = false;
+ if (te != null)
+ {
+ if (te.isForceAsyncReplication()) sync = false;
+ else if (te.isForceSyncReplication()) sync = true;
+ }
}
if (!sync && cache.getRPCManager().getReplicationQueue() != null &&
!usingBuddyReplication)
{
@@ -127,4 +136,16 @@
return false;
}
}
+
+ protected boolean isSynchronous(Option option)
+ {
+ if (option != null)
+ {
+ if (option.isForceSynchronous())
+ return true;
+ else if (option.isForceAsynchronous())
+ return false;
+ }
+ return defaultSynchronous;
+ }
}
\ No newline at end of file
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2007-09-18
03:22:49 UTC (rev 4477)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2007-09-18
03:23:34 UTC (rev 4478)
@@ -81,7 +81,7 @@
{
// the no-tx case:
//replicate an evict call.
- invalidateAcrossCluster(fqn, null);
+ invalidateAcrossCluster(fqn, null, isSynchronous(optionOverride));
}
}
}
@@ -117,7 +117,7 @@
{
try
{
- invalidateModifications(modifications,
configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null);
+ invalidateModifications(modifications,
configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous);
}
catch (Throwable t)
{
@@ -175,8 +175,17 @@
return retval;
}
+ /**
+ * Same as <code>invalidateAcrossCluster(fqn, workspace,
defaultSynchronous)</code>
+ * @deprecated use {@link #invalidateAcrossCluster(Fqn, TransactionWorkspace,
boolean)
+ */
protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace) throws
Throwable
{
+ invalidateAcrossCluster(fqn, workspace, defaultSynchronous);
+ }
+
+ protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace,
boolean synchronous) throws Throwable
+ {
// increment invalidations counter if statistics maintained
if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
m_invalidations++;
@@ -188,14 +197,23 @@
if (log.isDebugEnabled()) log.debug("Cache [" + cache.getLocalAddress() +
"] replicating " + call);
// voila, invalidated!
- replicateCall(call, configuration.getCacheMode() ==
Configuration.CacheMode.INVALIDATION_SYNC);
+ replicateCall(call, synchronous);
}
+ /**
+ * Same as <code>invalidateModifications(modifications, workspace,
defaultSynchronous)</code>
+ * @deprecated use {@link #invalidateModifications(List<MethodCall>,
TransactionWorkspace, boolean)
+ */
protected void invalidateModifications(List<MethodCall> modifications,
TransactionWorkspace workspace) throws Throwable
{
+ invalidateModifications(modifications, workspace, defaultSynchronous);
+ }
+
+ protected void invalidateModifications(List<MethodCall> modifications,
TransactionWorkspace workspace, boolean synchronous) throws Throwable
+ {
// optimise the calls list here.
Set<Fqn> modifiedFqns = optimisedIterator(modifications);
- for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace);
+ for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace, synchronous);
}
protected TransactionWorkspace getWorkspace(GlobalTransaction gtx)
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2007-09-18
03:22:49 UTC (rev 4477)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2007-09-18
03:23:34 UTC (rev 4478)
@@ -82,10 +82,11 @@
{
// NON-TRANSACTIONAL and CRUD method
if (log.isTraceEnabled()) log.trace("Non-tx crud meth");
+
+ // don't re-broadcast if we've received this from another cache in the
cluster.
if (ctx.isOriginLocal())
{
- // don't re-broadcast if we've received this from anotehr cache in
the cluster.
- handleReplicatedMethod(m, configuration.getCacheMode());
+ handleReplicatedMethod(m, isSynchronous(optionOverride));
}
}
else
@@ -96,7 +97,7 @@
return o;
}
- void handleReplicatedMethod(MethodCall m, Configuration.CacheMode mode) throws
Throwable
+ void handleReplicatedMethod(MethodCall m, boolean synchronous) throws Throwable
{
if (log.isTraceEnabled())
{
@@ -104,7 +105,7 @@
configuration.getCacheMode() + ", exclude_self=" + true +
", timeout=" +
configuration.getSyncReplTimeout());
}
- if (mode == Configuration.CacheMode.REPL_ASYNC || m.getMethodId() ==
MethodDeclarations.putForExternalReadMethodLocal_id)
+ if (!synchronous || m.getMethodId() ==
MethodDeclarations.putForExternalReadMethodLocal_id)
{
// 2. Replicate change to all *other* members (exclude self !)
replicateCall(m, false);