[jboss-cvs] JBossCache/src/org/jboss/cache/interceptors ...
Manik Surtani
manik at jboss.org
Thu Mar 29 12:02:59 EDT 2007
User: msurtani
Date: 07/03/29 12:02:59
Modified: src/org/jboss/cache/interceptors
OptimisticCreateIfNotExistsInterceptor.java
OptimisticLockingInterceptor.java
OptimisticNodeInterceptor.java
OptimisticInterceptor.java
OptimisticValidatorInterceptor.java
OptimisticReplicationInterceptor.java
Log:
Optimistic locking refactorings
Revision Changes Path
1.46 +69 -100 JBossCache/src/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticCreateIfNotExistsInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -b -r1.45 -r1.46
--- OptimisticCreateIfNotExistsInterceptor.java 7 Feb 2007 22:06:41 -0000 1.45
+++ OptimisticCreateIfNotExistsInterceptor.java 29 Mar 2007 16:02:58 -0000 1.46
@@ -20,33 +20,36 @@
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.OptimisticTransactionEntry;
-import org.jboss.cache.transaction.TransactionEntry;
import java.util.ArrayList;
import java.util.List;
/**
- * Used to make copies of nodes from the main tree into the
- * {@link TransactionWorkspace} as and when needed.
+ * Used to create new {@link NodeSPI} instances in the main data structure and then copy it into the
+ * {@link TransactionWorkspace} as {@link WorkspaceNode}s as needed. This is only invoked if nodes needed do not exist
+ * in the underlying structure, they are added and the corresponding {@link org.jboss.cache.optimistic.WorkspaceNode#isCreated()}
+ * would return <tt>true</tt> to denote that this node has been freshly created in the underlying structure.
*
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author <a href="mailto:stevew at jofti.com">Steve Woodcock (stevew at jofti.com)</a>
*/
public class OptimisticCreateIfNotExistsInterceptor extends OptimisticInterceptor
{
+ /**
+ * A reference to the node factory registered with the cache instance, used to create both WorkspaceNodes as well as
+ * NodeSPI objects in the underlying data structure.
+ */
private NodeFactory nodeFactory;
public void setCache(CacheSPI cache)
{
super.setCache(cache);
+ // set a reference to the node factory
nodeFactory = cache.getConfiguration().getRuntimeConfig().getNodeFactory();
}
public Object invoke(MethodCall m) throws Throwable
{
-
- //should this be just put methods
if (MethodDeclarations.isPutMethod(m.getMethodId()))
{
Object[] args = m.getArgs();
@@ -76,7 +79,7 @@
greedyGetFqns(fqns, node, newParent);
- if (log.isTraceEnabled()) log.trace("Adding Fqns to workspace " + fqns);
+ if (trace) log.trace("Adding Fqns " + fqns + " for a move() operation.");
for (Fqn f : fqns)
@@ -88,37 +91,26 @@
/**
* The only method that should be creating nodes.
*
- * @param fqn
+ * @param targetFqn
* @throws CacheException
*/
- private void createNode(Fqn fqn, boolean suppressNotification) throws CacheException
+ private void createNode(Fqn targetFqn, boolean suppressNotification) throws CacheException
{
- // we do nothing if fqn is null
- if (fqn == null) return;
+ // we do nothing if targetFqn is null
+ if (targetFqn == null) return;
- // get the transaction to create the nodes in
- GlobalTransaction gtx = cache.getInvocationContext().getGlobalTransaction();
- TransactionEntry baseTransactionEntry = txTable.get(gtx);
- OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) baseTransactionEntry;
+ boolean debug = log.isDebugEnabled();
- if (transactionEntry == null)
- {
- throw new CacheException("Unable to map global transaction " + gtx + " to transaction entry");
- }
+ InvocationContext ctx = cache.getInvocationContext();
+ GlobalTransaction gtx = getGlobalTransaction();
+ TransactionWorkspace workspace = getTransactionWorkspace(gtx);
- WorkspaceNode workspaceNode, childWorkspaceNode;
- Object childName;
+ WorkspaceNode workspaceNode;
List<Fqn> nodesCreated = new ArrayList<Fqn>();
- // how many levels do we have?
- int treeNodeSize = fqn.size();
-
- InvocationContext ctx = cache.getInvocationContext();
- // try and get the root from the transaction
- TransactionWorkspace workspace = transactionEntry.getTransactionWorkSpace();
-
- boolean debug = log.isDebugEnabled();
+ // synchronize on the workspace so that more than one thread doesn't attempt to put stuff into the workspace for
+ // the same transaction.
synchronized (workspace)
{
DataVersion version = null;
@@ -128,136 +120,113 @@
workspace.setVersioningImplicit(false);
}
+ // start with the ROOT node and then work our way down to the node necessary, creating nodes along the way.
workspaceNode = workspace.getNode(Fqn.ROOT);
- if (debug) log.debug("Global TX: " + gtx + " Root: " + workspaceNode);
+ if (debug) log.debug("GlobalTransaction: " + gtx + "; Root: " + workspaceNode);
- // we do not have the root so lets wrap it in case we need to add it
- // to the transaction
+ // we do not have the root in the workspace! Put it into thr workspace now.
if (workspaceNode == null)
{
NodeSPI node = cache.getRoot();
workspaceNode = nodeFactory.createWorkspaceNode(node, workspace);
workspace.addNode(workspaceNode);
- if (debug) log.debug("Created root node: " + workspaceNode);
+ log.debug("Created root node in workspace.");
}
else
{
- if (debug) log.debug("Found root node: " + workspaceNode);
+ log.debug("Found root node in workspace.");
}
// we will always have one root node here, by this stage
- Fqn tmpFqn = Fqn.ROOT;
- for (int i = 0; i < treeNodeSize; i++)
- {
- boolean isTargetFqn = (i == (treeNodeSize - 1));
- childName = fqn.get(i);
+ Fqn currentFqn = Fqn.ROOT;
- // build up intermediate node fqn from original Fqn
- tmpFqn = new Fqn(tmpFqn, childName);
+ // iterate through the target Fqn's elements.
+ int targetFqnSize = targetFqn.size(), currentDepth = 1;
+ for (Object childName : targetFqn.peekElements())
+ {
+ boolean isTargetFqn = (currentDepth == targetFqnSize);
+ currentDepth++;
// current workspace node canot be null.
// try and get the child of current node
- if (debug)
- {
- log.debug("workspaceNode.getChild(" + childName + ")");
- }
- NodeSPI tempchildNode = workspaceNode.getChild(new Fqn(childName));
+ if (debug) log.debug("Attempting to get child " + childName);
+ NodeSPI currentNode = workspaceNode.getChild(childName);
- // if (log.isDebugEnabled()) log.debug(" Entered synchronized workspaceNode " + workspaceNode + " access for gtx " + gtx);
- // no child exists with this name
- if (tempchildNode == null)
+ if (currentNode == null)
{
- if (debug) log.debug("Creating new child, doesn't exist");
- // we put the parent node into the workspace as we are changing it's children
- WorkspaceNode tempCheckWrapper = workspace.getNode(workspaceNode.getFqn());
- if (tempCheckWrapper == null || tempCheckWrapper.isDeleted())
+ // no child exists with this name; create it in the underlying data structure and then add it to the workspace.
+ if (trace) log.trace("Creating new child, since it doesn't exist in the cache.");
+ // we put the parent node into the workspace as we are changing it's children.
+ // at this point "workspaceNode" refers to the parent of the current node. It should never be null if
+ // you got this far!
+ if (workspaceNode.isDeleted())
{
//add a new one or overwrite an existing one that has been deleted
- if (debug)
- {
- log.debug("Parent node doesn't exist in workspace or has been deleted. Adding to workspace.");
- }
+ if (trace)
+ log.trace("Parent node doesn't exist in workspace or has been deleted. Adding to workspace.");
workspace.addNode(workspaceNode);
if (!(workspaceNode.getVersion() instanceof DefaultDataVersion))
workspaceNode.setVersioningImplicit(false);
}
else
{
- if (debug)
- {
- log.debug("Parent node exists: " + workspaceNode);
+ if (trace) log.trace("Parent node exists: " + workspaceNode);
}
- }
- // this does not add it into the real child nodes - but in its
- // local child map for the transaction
// get the version passed in, if we need to use explicit versioning.
DataVersion versionToPassIn = null;
- if (isTargetFqn && !workspace.isVersioningImplicit())
- {
- versionToPassIn = version;
- }
+ if (isTargetFqn && !workspace.isVersioningImplicit()) versionToPassIn = version;
- NodeSPI tempNode = workspaceNode.createChild(childName, workspaceNode.getNode(), cache, versionToPassIn);
+ NodeSPI newUnderlyingChildNode = workspaceNode.createChild(childName, workspaceNode.getNode(), cache, versionToPassIn);
- childWorkspaceNode = nodeFactory.createWorkspaceNode(tempNode, workspace);
- childWorkspaceNode.setVersioningImplicit(versionToPassIn == null || !isTargetFqn);
- if (log.isTraceEnabled())
- {
- log.trace("setting versioning of " + childWorkspaceNode.getFqn() + " to be " + (childWorkspaceNode.isVersioningImplicit() ? "implicit" : "explicit"));
- }
+ // now assign "workspaceNode" to the new child created.
+ workspaceNode = nodeFactory.createWorkspaceNode(newUnderlyingChildNode, workspace);
+ workspaceNode.setVersioningImplicit(versionToPassIn == null || !isTargetFqn);
+ if (trace)
+ log.trace("setting versioning of " + workspaceNode.getFqn() + " to be " + (workspaceNode.isVersioningImplicit() ? "implicit" : "explicit"));
// now add the wrapped child node into the transaction space
- workspace.addNode(childWorkspaceNode);
- childWorkspaceNode.markAsCreated();
+ workspace.addNode(workspaceNode);
+ workspaceNode.markAsCreated();
// save in list so we can broadcast our created nodes outside
// the synch block
- nodesCreated.add(tmpFqn);
-
+ nodesCreated.add(currentFqn);
}
else
{
// node does exist but might not be in the workspace
- childWorkspaceNode = workspace.getNode(tempchildNode.getFqn());
+ workspaceNode = workspace.getNode(currentNode.getFqn());
// wrap it up so we can put it in later if we need to
- if (childWorkspaceNode == null || childWorkspaceNode.isDeleted())
- {
- if (debug)
+ if (workspaceNode == null || workspaceNode.isDeleted())
{
- log.debug("Child node " + tempchildNode.getFqn() + " doesn't exist in workspace or has been deleted. Adding to workspace in gtx " + gtx);
- }
- childWorkspaceNode = nodeFactory.createWorkspaceNode(tempchildNode, workspace);
+ if (trace)
+ log.trace("Child node " + currentNode.getFqn() + " doesn't exist in workspace or has been deleted. Adding to workspace in gtx " + gtx);
+
+ workspaceNode = nodeFactory.createWorkspaceNode(currentNode, workspace);
if (isTargetFqn && !workspace.isVersioningImplicit())
{
- childWorkspaceNode.setVersion(version);
- childWorkspaceNode.setVersioningImplicit(false);
+ workspaceNode.setVersion(version);
+ workspaceNode.setVersioningImplicit(false);
}
else
{
- childWorkspaceNode.setVersioningImplicit(true);
- }
- if (log.isTraceEnabled())
- {
- log.trace("setting versioning of " + childWorkspaceNode.getFqn() + " to be " + (childWorkspaceNode.isVersioningImplicit() ? "implicit" : "explicit"));
+ workspaceNode.setVersioningImplicit(true);
}
-
+ if (trace)
+ log.trace("setting versioning of " + workspaceNode.getFqn() + " to be " + (workspaceNode.isVersioningImplicit() ? "implicit" : "explicit"));
+ workspace.addNode(workspaceNode);
}
else
{
- if (debug)
- {
- log.debug("Found child node: " + tempchildNode);
- }
+ if (trace) log.trace("Found child node in the workspace: " + currentNode);
}
}
- workspaceNode = childWorkspaceNode;
}
}// end sync block
- if (debug) log.debug("done synchronized access of GlobalTX");
if (!suppressNotification)
{
@@ -268,7 +237,7 @@
{
n.notifyNodeCreated(temp, true, false);
n.notifyNodeCreated(temp, false, false);
- if (log.isDebugEnabled()) log.debug("Notifying cache of node created in workspace " + temp);
+ if (trace) log.trace("Notifying cache of node created in workspace " + temp);
}
}
}
1.29 +61 -80 JBossCache/src/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticLockingInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -b -r1.28 -r1.29
--- OptimisticLockingInterceptor.java 12 Mar 2007 18:13:46 -0000 1.28
+++ OptimisticLockingInterceptor.java 29 Mar 2007 16:02:58 -0000 1.29
@@ -8,7 +8,6 @@
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
-import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.marshall.MethodCall;
@@ -18,10 +17,9 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
-import java.lang.reflect.Method;
-
/**
- * Locks nodes during transaction boundaries
+ * Locks nodes during transaction boundaries. Only affects prepare/commit/rollback method calls; other method calls
+ * are simply passed up the interceptor stack.
*
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author <a href="mailto:stevew at jofti.com">Steve Woodcock (stevew at jofti.com)</a>
@@ -38,22 +36,7 @@
public Object invoke(MethodCall m) throws Throwable
{
- // bypass for buddy group org method calls.
- if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m);
-
- InvocationContext ctx = cache.getInvocationContext();
Object retval = null;
- Method meth = m.getMethod();
-
- // bail out if _lock() is being called on the tree cache... this should never be called with o/l enabled.
- if (m.getMethodId() == MethodDeclarations.lockMethodLocal_id)
- {
- throw new CacheException("_lock() passed up the interceptor stack when Optimistic Locking is used. This is NOT supported.");
- }
-
- if (ctx.getTransaction() != null && ctx.getGlobalTransaction() != null)
- {
- GlobalTransaction gtx = ctx.getGlobalTransaction();
//we are interested in the prepare/commit/rollback
//this is irrespective of whether we are local or remote
@@ -61,9 +44,9 @@
{
case MethodDeclarations.optimisticPrepareMethod_id:
//try and acquire the locks - before passing on
+ GlobalTransaction gtx = getGlobalTransaction();
try
{
- if (log.isDebugEnabled()) log.debug("Calling lockNodes() with gtx " + ctx.getGlobalTransaction());
lockNodes(gtx);
}
catch (Throwable e)
@@ -77,68 +60,64 @@
catch (Throwable t)
{
// we have failed to unlock - now what?
- log.fatal("Failed to unlock on prepare ", t);
+ log.error("Failed to unlock nodes, after failing to lock nodes during a prepare! Locks are possibly in a very inconsistent state now!", t);
}
throw e;
-
}
+
// locks have acquired so lets pass on up
retval = super.invoke(m);
break;
case MethodDeclarations.commitMethod_id:
case MethodDeclarations.rollbackMethod_id:
- // we need to let the stack run its commits first -
+ // we need to let the stack run its commits or rollbacks first -
// we unlock last - even if an exception occurs
try
{
retval = super.invoke(m);
}
- catch (Throwable t)
- {
- log.debug("exception encountered on " + meth, t);
- throw t;
- }
finally
{
try
{
- unlock(gtx);
+ unlock(getGlobalTransaction());
}
catch (Exception e)
{
- log.fatal("Failed to unlock on " + meth, e);
+ // we have failed to unlock - now what?
+ log.error("Failed to unlock nodes after a commit or rollback! Locks are possibly in a very inconsistent state now!", e);
}
}
break;
+ case MethodDeclarations.lockMethodLocal_id:
+ // bail out if _lock() is being called on the tree cache... this should never be called with o/l enabled.
+ throw new CacheException("_lock() passed up the interceptor stack when Optimistic Locking is used. This is NOT supported.");
default:
//we do not care, just pass up the chain.
retval = super.invoke(m);
break;
}
- }
- else
- {
- throw new CacheException("Not in a transaction");
- }
-
return retval;
}
- private Object lockNodes(GlobalTransaction gtx) throws Exception
+ /**
+ * Locks all nodes held in the transaction workspace registered with the given global transaction.
+ *
+ * @param gtx global transaction which contains a workspace
+ */
+ private void lockNodes(GlobalTransaction gtx) throws InterruptedException
{
TransactionWorkspace<?, ?> workspace = getTransactionWorkspace(gtx);
- log.debug("Locking nodes in transaction workspace, presumably for a prepare()");
+ if (log.isDebugEnabled()) log.debug("Locking nodes in transaction workspace for GlobalTransaction " + gtx);
- // should be an ordered list
for (WorkspaceNode workspaceNode : workspace.getNodes().values())
{
-
NodeSPI node = workspaceNode.getNode();
boolean acquired = node.getLock().acquire(gtx, lockAcquisitionTimeout, NodeLock.LockType.WRITE);
if (acquired)
{
- if (log.isTraceEnabled()) log.trace("Acquired lock on node " + node.getFqn());
+ if (trace) log.trace("Acquired lock on node " + node.getFqn());
cache.getTransactionTable().addLock(gtx, node.getLock());
}
else
@@ -147,11 +126,13 @@
}
}
- return null;
-
}
-
+ /**
+ * Releases all locks held by the specified global transaction.
+ *
+ * @param gtx which holds locks
+ */
private void unlock(GlobalTransaction gtx)
{
TransactionEntry entry = txTable.get(gtx);
1.59 +12 -18 JBossCache/src/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticNodeInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java,v
retrieving revision 1.58
retrieving revision 1.59
diff -u -b -r1.58 -r1.59
--- OptimisticNodeInterceptor.java 19 Mar 2007 19:03:34 -0000 1.58
+++ OptimisticNodeInterceptor.java 29 Mar 2007 16:02:58 -0000 1.59
@@ -49,7 +49,7 @@
// bypass for buddy group org method calls.
if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m);
- if (log.isTraceEnabled()) log.trace("Processing method call " + m);
+ if (trace) log.trace("Processing method call " + m);
InvocationContext ctx = cache.getInvocationContext();
Object[] args = m.getArgs();
@@ -69,7 +69,7 @@
Fqn parentFqn = (Fqn) args[1], nodeFqn = (Fqn) args[0];
- if (log.isTraceEnabled())
+ if (trace)
{
log.trace("Adding nodes " + parentFqn + " and " + nodeFqn + " to the workspace.");
}
@@ -78,10 +78,12 @@
if (parent == null) throw new NodeNotExistsException("Node " + parentFqn + " does not exist!");
WorkspaceNode node = getOrCreateWorkspaceNode(nodeFqn, workspace, true);
- if (log.isTraceEnabled()) log.trace("Parent: " + parent);
- if (log.isTraceEnabled()) log.trace("Node: " + node);
-
- if (log.isTraceEnabled()) log.trace("Workspace snapshot: " + workspace);
+ if (trace)
+ {
+ log.trace("Parent: " + parent);
+ log.trace("Node: " + node);
+ log.trace("Workspace snapshot: " + workspace);
+ }
// be greedy about it - get children as well.
greedyGetNodes(node, workspace);
@@ -114,18 +116,13 @@
DataVersion version = ctx.getOptionOverrides().getDataVersion();
workspaceNode.setVersion(version);
- if (log.isTraceEnabled())
- {
- log.trace("Setting versioning for node " + workspaceNode.getFqn() + " to explicit");
- }
+ if (trace) log.trace("Setting versioning for node " + workspaceNode.getFqn() + " to explicit");
+
workspaceNode.setVersioningImplicit(false);
}
else
{
- if (log.isTraceEnabled())
- {
- log.trace("Setting versioning for node " + workspaceNode.getFqn() + " to implicit");
- }
+ if (trace) log.trace("Setting versioning for node " + workspaceNode.getFqn() + " to implicit");
workspaceNode.setVersioningImplicit(true);
}
}
@@ -334,10 +331,7 @@
private boolean removeNode(TransactionWorkspace workspace, WorkspaceNode workspaceNode) throws CacheException
{
- if (log.isTraceEnabled())
- {
- log.trace("removeNode " + workspace + " node=" + workspaceNode);
- }
+ if (trace) log.trace("removeNode " + workspace + " node=" + workspaceNode);
// it is already removed - we can ignore it
if (workspaceNode == null)
1.12 +20 -1 JBossCache/src/org/jboss/cache/interceptors/OptimisticInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticInterceptor.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -b -r1.11 -r1.12
--- OptimisticInterceptor.java 12 Mar 2007 18:13:46 -0000 1.11
+++ OptimisticInterceptor.java 29 Mar 2007 16:02:58 -0000 1.12
@@ -9,12 +9,14 @@
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.List;
@@ -27,12 +29,14 @@
{
protected TransactionManager txManager = null;
protected TransactionTable txTable = null;
+ protected boolean trace;
public void setCache(CacheSPI cache)
{
super.setCache(cache);
txManager = cache.getTransactionManager();
txTable = cache.getTransactionTable();
+ trace = log != null && log.isTraceEnabled();
}
protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException
@@ -41,7 +45,7 @@
if (transactionEntry == null)
{
- throw new CacheException("unable to map global transaction " + gtx + " to transaction entry");
+ throw new CacheException("Unable to map global transaction " + gtx + " to transaction entry when trying to retrieve transaction workspace.");
}
// try and get the workspace from the transaction
@@ -66,5 +70,20 @@
}
}
+ /**
+ * @return the {@link org.jboss.cache.transaction.GlobalTransaction}, extracted from the current {@link org.jboss.cache.InvocationContext}.
+ * @throws CacheException if the {@link org.jboss.cache.transaction.GlobalTransaction} or {@link javax.transaction.Transaction} associated with the
+ * {@link org.jboss.cache.InvocationContext} is null.
+ */
+ protected GlobalTransaction getGlobalTransaction() throws CacheException
+ {
+ InvocationContext ctx = cache.getInvocationContext();
+ Transaction tx = ctx.getTransaction();
+ if (tx == null) throw new CacheException("Transaction associated with the current invocation is null!");
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ if (gtx == null) throw new CacheException("GlobalTransaction associated with the current invocation is null!");
+ return gtx;
+ }
+
}
1.60 +61 -96 JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticValidatorInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java,v
retrieving revision 1.59
retrieving revision 1.60
diff -u -b -r1.59 -r1.60
--- OptimisticValidatorInterceptor.java 8 Mar 2007 16:12:49 -0000 1.59
+++ OptimisticValidatorInterceptor.java 29 Mar 2007 16:02:58 -0000 1.60
@@ -8,7 +8,6 @@
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodDeclarations;
@@ -18,25 +17,26 @@
import org.jboss.cache.optimistic.WorkspaceNode;
import org.jboss.cache.transaction.GlobalTransaction;
-import javax.transaction.Transaction;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
- * Validates the data in the transaction workspace against data in the actual
- * cache (versions only), and then performs commits if necessary. Does not
- * pass on prepare/commit/rollbacks to the other interceptors.
+ * Validates the data in the {@link TransactionWorkspace} against data in the underlying data structure
+ * (versions only) and then applies changes to the underlying data structure. This is only triggered when commit,
+ * rollback or prepare method calls are encountered. Other method calls are directly passed up the interceptor chain,
+ * untouched. Note that prepare/commit/rollbacks are <b>not</b> passed up the interceptor chain after being processed.
* <p/>
- * Currently uses simplistic integer based versioning and validation. Plans are
- * to have this configurable as there will always be a performance/complexity
- * tradeoff.
+ * When preparting, this interceptor does nothing more than validate versions.
+ * The validation scheme used is based on the {@link org.jboss.cache.optimistic.DataVersion} implementation used.
+ * {@link org.jboss.cache.optimistic.DataVersion#newerThan(org.jboss.cache.optimistic.DataVersion)} is used to determine
+ * whether the version of one instance is newer than the version of another. It is up to the {@link org.jboss.cache.optimistic.DataVersion}
+ * implementation to deal with attempting to compare incompatible version types (and potentially throwing {@link org.jboss.cache.optimistic.DataVersioningException}s.
* <p/>
- * On the commit it applies the changes in the workspace to the real nodes in
- * the cache.
+ * Upon successful commit, changes in the workspace are applied to the underlying data structure in the cache.
* <p/>
- * On rollback clears the nodes in the workspace.
+ * On rollback clears the nodes in the workspace and leaves the underlying data structure untouched.
*
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
* @author Steve Woodcock (<a href="mailto:stevew at jofti.com">stevew at jofti.com</a>)
@@ -45,32 +45,21 @@
{
public Object invoke(MethodCall m) throws Throwable
{
- // bypass for buddy group org metod calls.
- if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(m);
-
- InvocationContext ctx = cache.getInvocationContext();
- Transaction tx = ctx.getTransaction();
- GlobalTransaction gtx = ctx.getGlobalTransaction();
Object retval = null;
- if (tx == null)
- {
- throw new CacheException("Not in a transaction");
- }
-
// Methods we are interested in are prepare/commit
// They do not go further than this interceptor
switch (m.getMethodId())
{
case MethodDeclarations.optimisticPrepareMethod_id:
// should pass in a different prepare here
- validateNodes(gtx);
+ validateNodes(getGlobalTransaction());
break;
case MethodDeclarations.commitMethod_id:
- commit(gtx);
+ commit(getGlobalTransaction());
break;
case MethodDeclarations.rollbackMethod_id:
- rollBack(gtx);
+ rollBack(getGlobalTransaction());
break;
default:
retval = super.invoke(m);
@@ -79,59 +68,48 @@
return retval;
}
-
private void validateNodes(GlobalTransaction gtx) throws CacheException
{
- TransactionWorkspace workspace;
-
- try
- {
- workspace = getTransactionWorkspace(gtx);
- }
- catch (CacheException e)
- {
- throw new CacheException("unable to retrieve workspace", e);
- }
+ TransactionWorkspace workspace = getTransactionWorkspace(gtx);
- // should be an ordered list - get the set of nodes
+ // There is no guarantee that this collection is in any order!
Collection<WorkspaceNode> nodes = workspace.getNodes().values();
- //we have all locks here so lets try and validate
- if (log.isDebugEnabled()) log.debug("validating nodes. Num nodes: " + nodes.size());
+ //we ought to have all necessary locks here so lets try and validate
+ if (log.isDebugEnabled()) log.debug("Validating " + nodes.size() + " nodes.");
simpleValidate(nodes);
- log.debug("validated nodes");
+ log.debug("Successfully validated nodes");
}
private void simpleValidate(Collection<WorkspaceNode> nodes) throws DataVersioningException
{
- boolean trace = log.isTraceEnabled();
for (WorkspaceNode workspaceNode : nodes)
{
Fqn fqn = workspaceNode.getFqn();
if (trace) log.trace("Validating version for node " + fqn);
- NodeSPI realNode;
- realNode = cache.peek(fqn, true);
+ NodeSPI underlyingNode;
+ underlyingNode = cache.peek(fqn, true);
// if this is a newly created node then we expect the underlying node to be null.
// if not, we have a problem...
- if (realNode == null && !workspaceNode.isCreated())
+ if (underlyingNode == null && !workspaceNode.isCreated())
{
- throw new DataVersioningException("Real node for " + fqn + " is null, and this wasn't newly created in this tx!");
+ throw new DataVersioningException("Underlying node for " + fqn + " is null, and this node wasn't newly created in this transaction! We have a concurrent deletion event.");
}
- if (realNode != null && workspaceNode.isCreated())
+ if (underlyingNode != null && workspaceNode.isCreated())
{
- throw new DataVersioningException("Tx attempted to create " + fqn + " anew. It has already been created since this tx started by another (possibly remote) tx.");
+ throw new DataVersioningException("Transaction attempted to create " + fqn + " anew. It has already been created since this transaction started, by another (possibly remote) transaction. We have a concurrent creation event.");
}
if (!workspaceNode.isCreated() && (workspaceNode.isDeleted() || workspaceNode.isDirty()))
{
// if there is a DataVersion type mismatch here, leave it up to the DataVersion impl to barf if necessary. - JBCACHE-962
- if (realNode.getVersion().newerThan(workspaceNode.getVersion()))
+ if (underlyingNode.getVersion().newerThan(workspaceNode.getVersion()))
{
// we have an out of date node here
- throw new DataVersioningException("DataNode [" + fqn + "] version " + workspaceNode.getNode().getVersion() + " is newer than workspace node " + workspaceNode.getVersion());
+ throw new DataVersioningException("Version mismatch for node " + fqn + ": underlying node with version " + workspaceNode.getNode().getVersion() + " is newer than workspace node, with version " + workspaceNode.getVersion());
}
}
}
@@ -148,98 +126,93 @@
}
catch (CacheException e)
{
- log.trace("we can't rollback", e);
+ log.warn("we can't rollback", e);
return;
}
- log.debug("commiting validated changes ");
- // should be an ordered list
- Collection<WorkspaceNode> nodes = workspace.getNodes().values();
+ if (log.isDebugEnabled()) log.debug("Commiting successfully validated changes for GlobalTransaction " + gtx);
+
- boolean trace = log.isTraceEnabled();
- for (WorkspaceNode wrappedNode : nodes)
+ Collection<WorkspaceNode> workspaceNodes = workspace.getNodes().values();
+
+ for (WorkspaceNode workspaceNode : workspaceNodes)
{
+ NodeSPI underlyingNode = workspaceNode.getNode();
+
// short circuit if this node is deleted?
- if (wrappedNode.isDeleted())
+ if (workspaceNode.isDeleted())
{
- if (trace) log.trace("Workspace node " + wrappedNode.getFqn() + " deleted; removing");
- NodeSPI dNode = wrappedNode.getNode();
+ if (trace) log.trace("Workspace node " + workspaceNode.getFqn() + " deleted; removing");
-
- if (dNode.getFqn().isRoot())
+ if (underlyingNode.getFqn().isRoot())
{
- log.warn("Attempted to delete the root node");
+ throw new CacheException("An illegal attempt to delete the root node!");
}
else
{
- NodeSPI parent = dNode.getParent();
+ NodeSPI parent = underlyingNode.getParent();
if (parent == null)
{
- throw new IllegalStateException("dNode " + dNode + " has no parent");
+ throw new CacheException("Underlying node " + underlyingNode + " has no parent");
}
- parent.removeChildDirect(dNode.getFqn().getLastElement());
+ parent.removeChildDirect(underlyingNode.getFqn().getLastElement());
}
}
else
{
- NodeSPI current = wrappedNode.getNode();
boolean updateVersion = false;
- if (wrappedNode.isChildrenModified())
+ if (workspaceNode.isChildrenModified())
{
log.trace("Updating children since node has modified children");
// merge children.
- List<Set<Fqn>> deltas = wrappedNode.getMergedChildren();
+ List<Set<Fqn>> deltas = workspaceNode.getMergedChildren();
- if (trace) log.trace("Applying children deltas to parent node " + current.getFqn());
+ if (trace) log.trace("Applying children deltas to parent node " + underlyingNode.getFqn());
for (Fqn child : deltas.get(0))
{
- current.addChildDirect(wrappedNode.getChild(child.getLastElement()));
+ underlyingNode.addChildDirect(workspaceNode.getChild(child.getLastElement()));
}
for (Fqn child : deltas.get(1))
{
- current.removeChildDirect(child.getLastElement());
+ underlyingNode.removeChildDirect(child.getLastElement());
}
updateVersion = cache.getConfiguration().isLockParentForChildInsertRemove();
+ // do we need to notify listeners of a modification?? If all we've done is added children then don't
+ // notify.
}
- if (wrappedNode.isDirty())
+ if (workspaceNode.isDirty())
{
- // do we need to notify listeners of a modification?? If all we've done is added children then don't
- // notify.
log.trace("Merging data since node is dirty");
- Map mergedData = wrappedNode.getMergedData();
-
- current.clearDataDirect();
- current.putAllDirect(mergedData);
+ Map mergedData = workspaceNode.getMergedData();
+ underlyingNode.clearDataDirect();
+ underlyingNode.putAllDirect(mergedData);
updateVersion = true;
}
if (updateVersion)
{
- if (wrappedNode.isVersioningImplicit())
+ if (workspaceNode.isVersioningImplicit())
{
if (trace) log.trace("Versioning is implicit; incrementing.");
- current.setVersion(((DefaultDataVersion) wrappedNode.getVersion()).increment());
+ underlyingNode.setVersion(((DefaultDataVersion) workspaceNode.getVersion()).increment());
}
else
{
if (trace) log.trace("Versioning is explicit; not attempting an increment.");
- current.setVersion(wrappedNode.getVersion());
+ underlyingNode.setVersion(workspaceNode.getVersion());
}
+
if (trace)
- {
- log.trace("Setting version of node " + current.getFqn() + " from " + wrappedNode.getVersion() + " to " + current.getVersion());
- }
+ log.trace("Setting version of node " + underlyingNode.getFqn() + " from " + workspaceNode.getVersion() + " to " + underlyingNode.getVersion());
}
else
{
if (trace)
- {
- log.trace("Version update on " + wrappedNode.getFqn() + " not necessary since the node is not dirty or LockParentForChildInsertRemove is set to false");
- }
+ log.trace("Version update on " + workspaceNode.getFqn() + " not necessary since the node is not dirty or LockParentForChildInsertRemove is set to false");
}
}
}
@@ -249,15 +222,7 @@
private void rollBack(GlobalTransaction gtx)
{
TransactionWorkspace workspace;
- try
- {
workspace = getTransactionWorkspace(gtx);
- Map nodes = workspace.getNodes();
- nodes.clear();
- }
- catch (CacheException e)
- {
- log.info("Unable to roll back", e);
- }
+ workspace.clearNodes();
}
}
1.36 +107 -116 JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: OptimisticReplicationInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -b -r1.35 -r1.36
--- OptimisticReplicationInterceptor.java 19 Mar 2007 19:03:34 -0000 1.35
+++ OptimisticReplicationInterceptor.java 29 Mar 2007 16:02:58 -0000 1.36
@@ -22,13 +22,16 @@
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
- * Replication interceptor for the optimistically locked interceptor chain
+ * Replication interceptor for the optimistically locked interceptor chain. Responsible for replicating
+ * state to remote nodes. Unlike it's cousin, the {@link org.jboss.cache.interceptors.ReplicationInterceptor}, this interceptor
+ * only deals with transactional calls. Just like all things to do with Optimistic Locking, it is a requirement that
+ * everything is done in a transaction and the transaction context is available via {@link org.jboss.cache.InvocationContext#getTransaction()}
+ * and {@link org.jboss.cache.InvocationContext#getGlobalTransaction()}.
*
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
* @author <a href="mailto:stevew at jofti.com">Steve Woodcock (stevew at jofti.com)</a>
@@ -36,9 +39,11 @@
public class OptimisticReplicationInterceptor extends BaseRpcInterceptor
{
- //record of local broacasts - so we do not broadcast rollbacks/commits that resuted from
+ // record of local broacasts - so we do not broadcast rollbacks/commits that resuted from
// local prepare failures
- private Map broadcastTxs = new ConcurrentHashMap();
+ private static final Object DUMMY_VALUE = new Object();
+ // we really just need a set here, but concurrent CopyOnWriteArraySet has poor performance when writing.
+ private final Map<GlobalTransaction, Object> broadcastTxs = new ConcurrentHashMap<GlobalTransaction, Object>();
public Object invoke(MethodCall m) throws Throwable
{
@@ -50,46 +55,35 @@
if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
{
// skip replication!!
+ log.debug("Skipping replication for this call as cache mode is local, forced via an option override.");
return super.invoke(m);
}
Object retval;
- //we need a transaction to be present in order to do this
- if (ctx.getTransaction() != null)
- {
-
- // get the current gtx
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- if (gtx == null)
- {
- throw new CacheException("failed to get global transaction");
- }
- log.debug(" received method " + m);
+ if (log.isTraceEnabled()) log.trace("Processing method " + m);
// on a local prepare we first run the prepare -
//if this works broadcast it
+ GlobalTransaction gtx = null; // don't initialise this here; since some method calls may not have gtxs (such as buddy group organisation calls)
switch (m.getMethodId())
{
case MethodDeclarations.optimisticPrepareMethod_id:
// pass up the chain.
retval = super.invoke(m);
+ gtx = getGlobalTransaction(ctx);
if (!gtx.isRemote() && ctx.isOriginLocal())
{
// replicate the prepare call.
- retval = broadcastPrepare(m, gtx);
- //if we have an exception then the remote methods failed
- if (retval instanceof Throwable)
- {
- throw (Throwable) retval;
- }
+ broadcastPrepare(m, gtx);
}
break;
case MethodDeclarations.commitMethod_id:
//lets broadcast the commit first
- Throwable temp = null;
+ Throwable remoteCommitException = null;
+ gtx = getGlobalTransaction(ctx);
if (!gtx.isRemote() && ctx.isOriginLocal() && broadcastTxs.containsKey(gtx))
{
//we dont do anything
@@ -99,20 +93,21 @@
}
catch (Throwable t)
{
- log.error(" a problem occurred with remote commit", t);
- temp = t;
+ log.error("A problem occurred with remote commit", t);
+ remoteCommitException = t;
}
}
retval = super.invoke(m);
- if (temp != null)
+ if (remoteCommitException != null)
{
- throw temp;
+ throw remoteCommitException;
}
break;
case MethodDeclarations.rollbackMethod_id:
// lets broadcast the rollback first
- Throwable temp2 = null;
+ gtx = getGlobalTransaction(ctx);
+ Throwable remoteRollbackException = null;
if (!gtx.isRemote() && ctx.isOriginLocal() && broadcastTxs.containsKey(gtx))
{
//we dont do anything
@@ -123,34 +118,41 @@
catch (Throwable t)
{
log.error(" a problem occurred with remote rollback", t);
- temp2 = t;
+ remoteRollbackException = t;
}
}
retval = super.invoke(m);
- if (temp2 != null)
+ if (remoteRollbackException != null)
{
- throw temp2;
+ throw remoteRollbackException;
}
break;
case MethodDeclarations.putForExternalReadMethodLocal_id:
+ gtx = getGlobalTransaction(ctx);
cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
// and follow on to default behaviour now ...
default:
//it is something we do not care about
- log.debug(" received method " + m + " not handling");
+ if (log.isTraceEnabled()) log.trace("Received method " + m + " not handling");
retval = super.invoke(m);
break;
}
+ return retval;
}
- else
+
+ private GlobalTransaction getGlobalTransaction(InvocationContext ctx)
+ {
+ // get the current gtx
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ if (gtx == null)
{
- throw new CacheException("transaction does not exist");
+ throw new CacheException("failed to get global transaction");
}
- return retval;
+ return gtx;
}
- protected Object broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx) throws Throwable
+ protected void broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx) throws Throwable
{
boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
@@ -158,17 +160,15 @@
List modifications = (List) args[1];
int num_mods = modifications != null ? modifications.size() : 0;
- // this method will return immediately if we're the only member (because
- // exclude_self=true)
-
+ // this method will return immediately if we're the only member
if (cache.getMembers() != null && cache.getMembers().size() > 1)
{
-
+ // Map method calls to data versioned equivalents.
// See JBCACHE-843 and docs/design/DataVersioning.txt
MethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx));
//record the things we have possibly sent
- broadcastTxs.put(gtx, gtx);
+ broadcastTxs.put(gtx, DUMMY_VALUE);
if (log.isDebugEnabled())
{
log.debug("(" + cache.getLocalAddress()
@@ -187,7 +187,6 @@
+ "):not broadcasting prepare as members are " + cache.getMembers());
}
}
- return null;
}
@@ -195,30 +194,25 @@
{
boolean remoteCallSync = configuration.isSyncCommitPhase();
- // 1. Multicast commit() to all members (exclude myself though)
+ // Broadcast commit() to all members (exclude myself though)
if (cache.getMembers() != null && cache.getMembers().size() > 1)
{
try
{
broadcastTxs.remove(gtx);
- MethodCall commit_method = MethodCallFactory.create(MethodDeclarations.commitMethod,
- gtx);
+ MethodCall commit_method = MethodCallFactory.create(MethodDeclarations.commitMethod, gtx);
- log.debug("running remote commit for " + gtx
- + " and coord=" + cache.getLocalAddress());
+ if (log.isDebugEnabled())
+ log.debug("running remote commit for " + gtx + " and coord=" + cache.getLocalAddress());
replicateCall(commit_method, remoteCallSync);
}
catch (Exception e)
{
- log.fatal("commit failed", e);
+ log.error("Commit failed", e);
throw e;
}
}
- else
- {
- // ignoring
- }
}
protected void broadcastRollback(GlobalTransaction gtx) throws Throwable
@@ -227,20 +221,19 @@
if (cache.getMembers() != null && cache.getMembers().size() > 1)
{
- // 1. Multicast rollback() to all other members (excluding myself)
+ // Broadcast rollback() to all other members (excluding myself)
try
{
broadcastTxs.remove(gtx);
MethodCall rollback_method = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx);
- log.debug("running remote rollback for " + gtx
- + " and coord=" + cache.getLocalAddress());
+ if (log.isDebugEnabled())
+ log.debug("running remote rollback for " + gtx + " and coord=" + cache.getLocalAddress());
replicateCall(rollback_method, remoteCallSync);
-
}
catch (Exception e)
{
- log.error("rollback failed", e);
+ log.error("Rollback failed", e);
throw e;
}
}
@@ -249,19 +242,17 @@
private MethodCall mapDataVersionedMethodCalls(MethodCall m, TransactionWorkspace w)
{
Object[] origArgs = m.getArgs();
- return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]);
+ return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List<MethodCall>) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]);
}
/**
* Translates a list of MethodCalls from non-versioned calls to versioned calls.
*/
- private List translate(List l, TransactionWorkspace w)
+ private List<MethodCall> translate(List<MethodCall> l, TransactionWorkspace w)
{
- List newList = new ArrayList();
- Iterator origCalls = l.iterator();
- while (origCalls.hasNext())
+ List<MethodCall> newList = new ArrayList<MethodCall>();
+ for (MethodCall origCall : l)
{
- MethodCall origCall = (MethodCall) origCalls.next();
if (MethodDeclarations.isDataGravitationMethod(origCall.getMethodId()))
{
// no need to translate data gravitation calls.
@@ -280,7 +271,7 @@
// build up the new arguments list for the new call. Identical to the original lis except that it has the
// data version tacked on to the end.
Object[] newArgs = new Object[origArgs.length + 1];
- for (int i = 0; i < origArgs.length; i++) newArgs[i] = origArgs[i];
+ System.arraycopy(origArgs, 0, newArgs, 0, origArgs.length);
newArgs[origArgs.length] = versionToBroadcast;
// now create a new method call which contains this data version
More information about the jboss-cvs-commits
mailing list