[jboss-cvs] JBossCache/src/org/jboss/cache/interceptors ...
Manik Surtani
msurtani at jboss.com
Sat Nov 11 12:50:03 EST 2006
User: msurtani
Date: 06/11/11 12:50:03
Modified: src/org/jboss/cache/interceptors Tag:
Branch_JBossCache_1_4_0
OptimisticValidatorInterceptor.java
OptimisticCreateIfNotExistsInterceptor.java
OptimisticNodeInterceptor.java TxInterceptor.java
OptimisticReplicationInterceptor.java
Log:
JBCACHE-843
Revision Changes Path
No revision
No revision
1.25.2.3 +13 -3 JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticValidatorInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java,v
retrieving revision 1.25.2.2
retrieving revision 1.25.2.3
diff -u -b -r1.25.2.2 -r1.25.2.3
--- OptimisticValidatorInterceptor.java 10 Nov 2006 20:03:33 -0000 1.25.2.2
+++ OptimisticValidatorInterceptor.java 11 Nov 2006 17:50:03 -0000 1.25.2.3
@@ -15,6 +15,7 @@
import org.jboss.cache.TreeCache;
import org.jboss.cache.marshall.JBCMethodCall;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersioningException;
import org.jboss.cache.optimistic.DefaultDataVersion;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
@@ -129,10 +130,19 @@
{
throw new CacheException("Real node for " + fqn + " is null, and this wasn't newly created in this tx!");
}
- if (!workspaceNode.isCreated() && realNode.getVersion().newerThan( workspaceNode.getVersion() ))
+
+ if (!workspaceNode.isCreated() )
+ {
+ // test that the 2 DataVersion types match up
+ if (!realNode.getVersion().getClass().equals(workspaceNode.getVersion().getClass()))
+ {
+ throw new DataVersioningException("Attempting to apply data version of type " + workspaceNode.getVersion().getClass() + " to a node that already contains version of type " + realNode.getVersion().getClass());
+ }
+ if (realNode.getVersion().newerThan( workspaceNode.getVersion() ))
{
// we have an out of date node here
- throw new CacheException("DataNode [" + fqn + "] version " + ((OptimisticTreeNode)workspaceNode.getNode()).getVersion() + " is newer than workspace node " + workspaceNode.getVersion());
+ throw new DataVersioningException("DataNode [" + fqn + "] version " + ((OptimisticTreeNode)workspaceNode.getNode()).getVersion() + " is newer than workspace node " + workspaceNode.getVersion());
+ }
}
}
}
1.21.2.1 +12 -12 JBossCache/src/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticCreateIfNotExistsInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java,v
retrieving revision 1.21
retrieving revision 1.21.2.1
diff -u -b -r1.21 -r1.21.2.1
--- OptimisticCreateIfNotExistsInterceptor.java 30 May 2006 16:33:02 -0000 1.21
+++ OptimisticCreateIfNotExistsInterceptor.java 11 Nov 2006 17:50:03 -0000 1.21.2.1
@@ -6,7 +6,16 @@
*/
package org.jboss.cache.interceptors;
-import org.jboss.cache.*;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.DataNode;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.GlobalTransaction;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.OptimisticTransactionEntry;
+import org.jboss.cache.TransactionEntry;
+import org.jboss.cache.TransactionTable;
+import org.jboss.cache.TreeCache;
+import org.jboss.cache.TreeNode;
import org.jboss.cache.factories.NodeFactory;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.optimistic.DataVersion;
@@ -26,14 +35,6 @@
*/
public class OptimisticCreateIfNotExistsInterceptor extends OptimisticInterceptor
{
- private static final List putMethods = new ArrayList(3);
-
- static
- {
- putMethods.add(MethodDeclarations.putDataEraseMethodLocal);
- putMethods.add(MethodDeclarations.putDataMethodLocal);
- putMethods.add(MethodDeclarations.putKeyValMethodLocal);
- }
public void setCache(TreeCache cache)
{
@@ -42,9 +43,8 @@
public Object invoke(MethodCall m) throws Throwable
{
-
//should this be just put methods
- if (putMethods.contains(m.getMethod()))
+ if (MethodDeclarations.isOptimisticPutMethod(m.getMethod()))
{
Object[] args = m.getArgs();
Fqn fqn = (Fqn) (args != null ? args[1] : null);
1.21.2.4 +0 -0 JBossCache/src/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
1.48.2.8 +34 -6 JBossCache/src/org/jboss/cache/interceptors/TxInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: TxInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/TxInterceptor.java,v
retrieving revision 1.48.2.7
retrieving revision 1.48.2.8
diff -u -b -r1.48.2.7 -r1.48.2.8
--- TxInterceptor.java 19 Oct 2006 14:29:13 -0000 1.48.2.7
+++ TxInterceptor.java 11 Nov 2006 17:50:03 -0000 1.48.2.8
@@ -19,6 +19,7 @@
import org.jboss.cache.marshall.JBCMethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
import org.jgroups.Address;
import org.jgroups.blocks.MethodCall;
@@ -41,7 +42,7 @@
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author <a href="mailto:stevew at jofti.com">Steve Woodcock (stevew at jofti.com)</a>
*/
-public class TxInterceptor extends Interceptor implements TxInterceptorMBean
+public class TxInterceptor extends OptimisticInterceptor implements TxInterceptorMBean
{
/**
* List <Transaction>that we have registered for
@@ -432,7 +433,7 @@
// TODO: Manik: Refactor this to pass across entire workspace?
Object retval;
if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
- replayModifications(modifications, ltx);
+ replayModifications(modifications, ltx, true);
retval = super.invoke(m);
// JBCACHE-361 Confirm that the transaction is ACTIVE
if (!isActive(ltx))
@@ -453,7 +454,7 @@
// now pass up the prepare method itself.
try
{
- replayModifications(modifications, ltx);
+ replayModifications(modifications, ltx, false);
if (isOnePhaseCommitPrepareMehod(m))
{
log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
@@ -524,7 +525,7 @@
return null;
}
- private Object replayModifications(List modifications, Transaction tx)
+ private Object replayModifications(List modifications, Transaction tx, boolean injectDataVersions)
{
Object retval = null;
@@ -532,10 +533,23 @@
{
for (Iterator it = modifications.iterator(); it.hasNext();)
{
- MethodCall method_call = (MethodCall) it.next();
+ JBCMethodCall method_call = (JBCMethodCall) it.next();
try
{
+ if (injectDataVersions)
+ {
+ Object[] origArgs = method_call.getArgs();
+ injectDataVersion((DataVersion) origArgs[origArgs.length - 1]);
+ // modify the call to the non-dataversioned counterpart since we've popped out the data version
+ Object[] args = new Object[origArgs.length - 1];
+ for (int i=0; i<args.length; i++) args[i] = origArgs[i];
+
+ retval = super.invoke(MethodCallFactory.create(MethodDeclarations.getUnversionedMethod(method_call.getMethodId()), args));
+ }
+ else
+ {
retval = super.invoke(method_call);
+ }
if (!isActive(tx))
{
throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
@@ -546,6 +560,12 @@
log.error("method invocation failed", t);
retval = t;
}
+ finally
+ {
+ // reset any options
+ if (injectDataVersions) getInvocationContext().setOptionOverrides(null);
+ }
+
if (retval != null && retval instanceof Exception)
{
throw new RuntimeException((Exception) retval);
@@ -556,6 +576,14 @@
return retval;
}
+
+
+ public void injectDataVersion(DataVersion v)
+ {
+ Option o = new Option();
+ o.setDataVersion(v);
+ getInvocationContext().setOptionOverrides(o);
+ }
/**
* Handles a commit or a rollback for a remote gtx. Called by invoke().
* @param m
1.21.2.2 +82 -1 JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticReplicationInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java,v
retrieving revision 1.21.2.1
retrieving revision 1.21.2.2
diff -u -b -r1.21.2.1 -r1.21.2.2
--- OptimisticReplicationInterceptor.java 10 Nov 2006 20:03:33 -0000 1.21.2.1
+++ OptimisticReplicationInterceptor.java 11 Nov 2006 17:50:03 -0000 1.21.2.2
@@ -8,15 +8,23 @@
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.jboss.cache.CacheException;
+import org.jboss.cache.Fqn;
import org.jboss.cache.GlobalTransaction;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.OptimisticTransactionEntry;
import org.jboss.cache.TreeCache;
import org.jboss.cache.config.Option;
import org.jboss.cache.marshall.JBCMethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.optimistic.DefaultDataVersion;
+import org.jboss.cache.optimistic.TransactionWorkspace;
+import org.jboss.cache.optimistic.WorkspaceNode;
import org.jgroups.blocks.MethodCall;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -155,6 +163,9 @@
List modifications = (List) args[1];
int num_mods = modifications != null ? modifications.size() : 0;
+ // See JBCACHE-843 and docs/design/DataVersioning.txt
+ JBCMethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx));
+
// this method will return immediately if we're the only member (because
// exclude_self=true)
@@ -167,7 +178,7 @@
+ "): broadcasting prepare for " + gtx
+ " (" + num_mods + " modifications");
- replicateCall(methodCall, remoteCallSync);
+ replicateCall(toBroadcast, remoteCallSync);
}
else
{
@@ -178,6 +189,63 @@
return null;
}
+ private JBCMethodCall mapDataVersionedMethodCalls(JBCMethodCall m, TransactionWorkspace w)
+ {
+ Object[] origArgs = m.getArgs();
+ return MethodCallFactory.create(m.getMethod(), new Object[]{origArgs[0], translate((List) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4] });
+ }
+
+ /**
+ * Translates a list of MethodCalls from non-versioned calls to versioned calls.
+ */
+ private List translate(List l, TransactionWorkspace w)
+ {
+ List newList = new ArrayList();
+ Iterator origCalls = l.iterator();
+ while (origCalls.hasNext())
+ {
+ JBCMethodCall origCall = (JBCMethodCall) origCalls.next();
+ Object[] origArgs = origCall.getArgs();
+ // get the data version associated with this orig call.
+
+ // since these are all crud methods the Fqn is at arg subscript 1.
+ Fqn fqn = (Fqn) origArgs[1];
+ // now get a hold of the data version for this specific modification
+ DataVersion versionToBroadcast = getVersionToBroadcast(w, fqn);
+
+ // build up the new arguments list for the new call. Identical to the original lis except that it has the
+ // data version tacked on to the end.
+ Object[] newArgs = new Object[origArgs.length + 1];
+ for (int i=0; i<origArgs.length; i++) newArgs[i] = origArgs[i];
+ newArgs[origArgs.length] = versionToBroadcast;
+
+ // now create a new method call which contains this data version
+ JBCMethodCall newCall = MethodCallFactory.create(MethodDeclarations.getVersionedMethod(origCall.getMethodId()), newArgs);
+
+ // and add it to the new list.
+ newList.add(newCall);
+ }
+ return newList;
+ }
+
+ /**
+ * Digs out the DataVersion for a given Fqn. If the versioning is explicit, it is passed as-is. If implicit, it is
+ * cloned and then incremented, and the clone is returned.
+ */
+ private DataVersion getVersionToBroadcast(TransactionWorkspace w, Fqn f)
+ {
+ WorkspaceNode n = w.getNode(f);
+ if (w.isVersioningImplicit())
+ {
+ DefaultDataVersion v = (DefaultDataVersion) n.getVersion();
+ return v.increment();
+ }
+ else
+ {
+ return n.getVersion();
+ }
+ }
+
protected void broadcastCommit(GlobalTransaction gtx) throws Throwable
{
@@ -209,6 +277,19 @@
}
}
+ protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException
+ {
+ OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) cache.getTransactionTable().get(gtx);
+
+ if (transactionEntry == null)
+ {
+ throw new CacheException("unable to map global transaction " + gtx + " to transaction entry");
+ }
+
+ // try and get the workspace from the transaction
+ return transactionEntry.getTransactionWorkSpace();
+ }
+
protected void broadcastRollback(GlobalTransaction gtx) throws Throwable
{
boolean remoteCallSync = cache.getSyncRollbackPhase();
More information about the jboss-cvs-commits
mailing list