[jboss-cvs] JBossCache/src/org/jboss/cache/interceptors ...
Manik Surtani
msurtani at jboss.com
Sat Nov 11 14:55:19 EST 2006
User: msurtani
Date: 06/11/11 14:55:19
Modified: src/org/jboss/cache/interceptors TxInterceptor.java
OptimisticValidatorInterceptor.java
OptimisticReplicationInterceptor.java
Log:
JBCACHE-842 and JBCACHE-843
Revision Changes Path
1.64 +30 -4 JBossCache/src/org/jboss/cache/interceptors/TxInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TxInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/TxInterceptor.java,v
retrieving revision 1.63
retrieving revision 1.64
diff -u -b -r1.63 -r1.64
--- TxInterceptor.java 10 Nov 2006 02:48:46 -0000 1.63
+++ TxInterceptor.java 11 Nov 2006 19:55:19 -0000 1.64
@@ -20,6 +20,7 @@
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
import org.jgroups.Address;
import javax.transaction.Status;
@@ -355,7 +356,7 @@
// TODO: Manik: Refactor this to pass across entire workspace?
Object retval;
if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
- replayModifications(modifications, ltx);
+ replayModifications(modifications, ltx, true);
retval = super.invoke(m);
// JBCACHE-361 Confirm that the transaction is ACTIVE
if (!isActive(ltx))
@@ -376,7 +377,7 @@
// now pass up the prepare method itself.
try
{
- replayModifications(modifications, ltx);
+ replayModifications(modifications, ltx, false);
if (isOnePhaseCommitPrepareMehod(m))
{
log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
@@ -449,7 +450,7 @@
return null;
}
- private Object replayModifications(List<MethodCall> modifications, Transaction tx)
+ private Object replayModifications(List<MethodCall> modifications, Transaction tx, boolean injectDataVersions)
{
Object retval = null;
@@ -459,7 +460,20 @@
{
try
{
+ if (injectDataVersions)
+ {
+ Object[] origArgs = modification.getArgs();
+ injectDataVersion((DataVersion) origArgs[origArgs.length - 1]);
+ // modify the call to the non-dataversioned counterpart since we've popped out the data version
+ Object[] args = new Object[origArgs.length - 1];
+ System.arraycopy(origArgs, 0, args, 0, args.length);
+
+ retval = super.invoke(MethodCallFactory.create(MethodDeclarations.getUnversionedMethod(modification.getMethodId()), args));
+ }
+ else
+ {
retval = super.invoke(modification);
+ }
if (!isActive(tx))
{
throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
@@ -470,6 +484,11 @@
log.error("method invocation failed", t);
retval = t;
}
+ finally
+ {
+ // reset any options
+ if (injectDataVersions) cache.getInvocationContext().setOptionOverrides(null);
+ }
if (retval != null && retval instanceof Exception)
{
throw new RuntimeException((Exception) retval);
@@ -480,6 +499,13 @@
return retval;
}
+ public void injectDataVersion(DataVersion v)
+ {
+ Option o = new Option();
+ o.setDataVersion(v);
+ cache.getInvocationContext().setOptionOverrides(o);
+ }
+
/**
* Handles a commit or a rollback for a remote gtx. Called by invoke().
*
1.38 +13 -3 JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticValidatorInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java,v
retrieving revision 1.37
retrieving revision 1.38
diff -u -b -r1.37 -r1.38
--- OptimisticValidatorInterceptor.java 10 Nov 2006 20:32:51 -0000 1.37
+++ OptimisticValidatorInterceptor.java 11 Nov 2006 19:55:19 -0000 1.38
@@ -15,6 +15,7 @@
import org.jboss.cache.TreeCacheProxyImpl;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersioningException;
import org.jboss.cache.optimistic.DefaultDataVersion;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
@@ -132,10 +133,19 @@
{
throw new CacheException("Real node for " + fqn + " is null, and this wasn't newly created in this tx!");
}
- if (!workspaceNode.isCreated() && realNode.getVersion().newerThan(workspaceNode.getVersion()))
+
+ if (!workspaceNode.isCreated())
+ {
+ // test that the 2 DataVersion types match up
+ if (!realNode.getVersion().getClass().equals(workspaceNode.getVersion().getClass()))
+ {
+ throw new DataVersioningException("Attempting to apply data version of type " + workspaceNode.getVersion().getClass() + " to a node that already contains version of type " + realNode.getVersion().getClass());
+ }
+ if ((workspaceNode.isDeleted() || workspaceNode.isDirty()) && realNode.getVersion().newerThan(workspaceNode.getVersion()))
{
// we have an out of date node here
- throw new CacheException("DataNode [" + fqn + "] version " + ((OptimisticTreeNode) workspaceNode.getNode()).getVersion() + " is newer than workspace node " + workspaceNode.getVersion());
+ throw new DataVersioningException("DataNode [" + fqn + "] version " + ((OptimisticTreeNode) workspaceNode.getNode()).getVersion() + " is newer than workspace node " + workspaceNode.getVersion());
+ }
}
}
}
1.28 +83 -1 JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticReplicationInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -b -r1.27 -r1.28
--- OptimisticReplicationInterceptor.java 10 Nov 2006 20:32:51 -0000 1.27
+++ OptimisticReplicationInterceptor.java 11 Nov 2006 19:55:19 -0000 1.28
@@ -8,14 +8,22 @@
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.jboss.cache.CacheException;
+import org.jboss.cache.Fqn;
import org.jboss.cache.GlobalTransaction;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.OptimisticTransactionEntry;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.optimistic.DefaultDataVersion;
+import org.jboss.cache.optimistic.TransactionWorkspace;
+import org.jboss.cache.optimistic.WorkspaceNode;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -153,13 +161,16 @@
if (cache.getMembers() != null && cache.getMembers().size() > 1)
{
+ // See JBCACHE-843 and docs/design/DataVersioning.txt
+ MethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx));
+
//record the things we have possibly sent
broadcastTxs.put(gtx, gtx);
if (log.isDebugEnabled()) log.debug("(" + cache.getLocalAddress()
+ "): broadcasting prepare for " + gtx
+ " (" + num_mods + " modifications");
- replicateCall(methodCall, remoteCallSync);
+ replicateCall(toBroadcast, remoteCallSync);
}
else
{
@@ -225,4 +236,75 @@
}
}
}
+
+ private MethodCall mapDataVersionedMethodCalls(MethodCall m, TransactionWorkspace w)
+ {
+ Object[] origArgs = m.getArgs();
+ return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]);
+ }
+
+ /**
+ * Translates a list of MethodCalls from non-versioned calls to versioned calls.
+ */
+ private List translate(List l, TransactionWorkspace w)
+ {
+ List newList = new ArrayList();
+ Iterator origCalls = l.iterator();
+ while (origCalls.hasNext())
+ {
+ MethodCall origCall = (MethodCall) origCalls.next();
+ Object[] origArgs = origCall.getArgs();
+ // get the data version associated with this orig call.
+
+ // since these are all crud methods the Fqn is at arg subscript 1.
+ Fqn fqn = (Fqn) origArgs[1];
+ // now get a hold of the data version for this specific modification
+ DataVersion versionToBroadcast = getVersionToBroadcast(w, fqn);
+
+ // build up the new arguments list for the new call. Identical to the original lis except that it has the
+ // data version tacked on to the end.
+ Object[] newArgs = new Object[origArgs.length + 1];
+ for (int i = 0; i < origArgs.length; i++) newArgs[i] = origArgs[i];
+ newArgs[origArgs.length] = versionToBroadcast;
+
+ // now create a new method call which contains this data version
+ MethodCall newCall = MethodCallFactory.create(MethodDeclarations.getVersionedMethod(origCall.getMethodId()), newArgs);
+
+ // and add it to the new list.
+ newList.add(newCall);
+ }
+ return newList;
+ }
+
+ /**
+ * Digs out the DataVersion for a given Fqn. If the versioning is explicit, it is passed as-is. If implicit, it is
+ * cloned and then incremented, and the clone is returned.
+ */
+ private DataVersion getVersionToBroadcast(TransactionWorkspace w, Fqn f)
+ {
+ WorkspaceNode n = w.getNode(f);
+ if (w.isVersioningImplicit())
+ {
+ DefaultDataVersion v = (DefaultDataVersion) n.getVersion();
+ return v.increment();
+ }
+ else
+ {
+ return n.getVersion();
+ }
+ }
+
+ protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException
+ {
+ OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) cache.getTransactionTable().get(gtx);
+
+ if (transactionEntry == null)
+ {
+ throw new CacheException("unable to map global transaction " + gtx + " to transaction entry");
+ }
+
+ // try and get the workspace from the transaction
+ return transactionEntry.getTransactionWorkSpace();
+ }
+
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list