[jboss-cvs] JBossCache/src/org/jboss/cache/interceptors ...
Manik Surtani
msurtani at belmont.prod.atl2.jboss.com
Tue Aug 29 12:35:52 EDT 2006
User: msurtani
Date: 06/08/29 12:35:52
Modified: src/org/jboss/cache/interceptors
PessimisticLockInterceptor.java
ReplicationInterceptor.java
Log:
Fixed basic replication issues
Revision Changes Path
1.29 +212 -159 JBossCache/src/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: PessimisticLockInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/PessimisticLockInterceptor.java,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -b -r1.28 -r1.29
--- PessimisticLockInterceptor.java 25 Aug 2006 14:10:07 -0000 1.28
+++ PessimisticLockInterceptor.java 29 Aug 2006 16:35:52 -0000 1.29
@@ -19,8 +19,8 @@
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodDeclarations;
import javax.transaction.Transaction;
import java.util.Collections;
@@ -36,30 +36,34 @@
* current method and unlock when the method returns.
*
* @author Bela Ban
- * @version $Id: PessimisticLockInterceptor.java,v 1.28 2006/08/25 14:10:07 msurtani Exp $
+ * @version $Id: PessimisticLockInterceptor.java,v 1.29 2006/08/29 16:35:52 msurtani Exp $
*/
-public class PessimisticLockInterceptor extends Interceptor {
- TransactionTable tx_table=null;
+public class PessimisticLockInterceptor extends Interceptor
+{
+ TransactionTable tx_table = null;
- /** Map<Object, java.util.List>. Keys = threads, values = lists of locks held by that thread */
+ /**
+ * Map<Object, java.util.List>. Keys = threads, values = lists of locks held by that thread
+ */
Map lock_table;
private long lock_acquisition_timeout;
LockManager lockManager = new LockManager();
- public void setCache(CacheSPI cache) {
+ public void setCache(CacheSPI cache)
+ {
super.setCache(cache);
- tx_table=cache.getTransactionTable();
- lock_table=cache.getLockTable();
- lock_acquisition_timeout=cache.getConfiguration().getLockAcquisitionTimeout();
+ tx_table = cache.getTransactionTable();
+ lock_table = cache.getLockTable();
+ lock_acquisition_timeout = cache.getConfiguration().getLockAcquisitionTimeout();
}
-
- public Object invoke(MethodCall m) throws Throwable {
- Fqn fqn=null;
- DataNode.LockType lock_type=DataNode.LockType.NONE;
- Object[] args=m.getArgs();
+ public Object invoke(MethodCall m) throws Throwable
+ {
+ Fqn fqn = null;
+ DataNode.LockType lock_type = DataNode.LockType.NONE;
+ Object[] args = m.getArgs();
InvocationContext ctx = cache.getInvocationContext();
if (log.isTraceEnabled()) log.trace("PessimisticLockInterceptor invoked for method " + m);
@@ -84,9 +88,8 @@
* (keyed by TX) */
// List locks=null;
- boolean recursive=false;
- boolean createIfNotExists=false;
-
+ boolean recursive = false;
+ boolean createIfNotExists = false;
// 1. Determine the type of lock (read, write, or none) depending on the method. If no lock is required, invoke
// the method, then return immediately
@@ -96,24 +99,24 @@
case MethodDeclarations.putDataMethodLocal_id:
case MethodDeclarations.putDataEraseMethodLocal_id:
case MethodDeclarations.putKeyValMethodLocal_id:
- createIfNotExists=true;
- fqn=(Fqn)args[1];
- lock_type=DataNode.LockType.WRITE;
+ createIfNotExists = true;
+ fqn = (Fqn) args[1];
+ lock_type = DataNode.LockType.WRITE;
break;
case MethodDeclarations.removeNodeMethodLocal_id:
- fqn=(Fqn)args[1];
- lock_type=DataNode.LockType.WRITE;
- recursive=true; // remove node and *all* child nodes
+ fqn = (Fqn) args[1];
+ lock_type = DataNode.LockType.WRITE;
+ recursive = true; // remove node and *all* child nodes
break;
case MethodDeclarations.removeKeyMethodLocal_id:
case MethodDeclarations.removeDataMethodLocal_id:
case MethodDeclarations.addChildMethodLocal_id:
- fqn=(Fqn)args[1];
- lock_type=DataNode.LockType.WRITE;
+ fqn = (Fqn) args[1];
+ lock_type = DataNode.LockType.WRITE;
break;
case MethodDeclarations.evictNodeMethodLocal_id:
- fqn=(Fqn)args[0];
- lock_type=DataNode.LockType.WRITE;
+ fqn = (Fqn) args[0];
+ lock_type = DataNode.LockType.WRITE;
break;
case MethodDeclarations.getKeyValueMethodLocal_id:
case MethodDeclarations.getNodeMethodLocal_id:
@@ -121,13 +124,13 @@
case MethodDeclarations.getChildrenNamesMethodLocal_id:
case MethodDeclarations.releaseAllLocksMethodLocal_id:
case MethodDeclarations.printMethodLocal_id:
- fqn=(Fqn)args[0];
- lock_type=DataNode.LockType.READ;
+ fqn = (Fqn) args[0];
+ lock_type = DataNode.LockType.READ;
break;
case MethodDeclarations.lockMethodLocal_id:
- fqn=(Fqn)args[0];
- lock_type=(DataNode.LockType)args[1];
- recursive=((Boolean)args[2]).booleanValue();
+ fqn = (Fqn) args[0];
+ lock_type = (DataNode.LockType) args[1];
+ recursive = ((Boolean) args[2]).booleanValue();
break;
case MethodDeclarations.commitMethod_id:
// commit propagated up from the tx interceptor
@@ -150,132 +153,167 @@
// If no TX: add each acquired lock to the list of locks for this method (locks)
// If TX: [merge code from TransactionInterceptor]: register with TxManager, on commit/rollback,
// release the locks for the given TX
- if(fqn != null) {
- if(createIfNotExists) {
- do {
+ if (fqn != null)
+ {
+ if (createIfNotExists)
+ {
+ do
+ {
// TODO: WHat do we do about createIfNotExists flag?
lock(fqn, ctx.getGlobalTransaction(), lock_type, recursive, createIfNotExists);
}
- while(!cache.hasChild(fqn)); // keep trying until we have the lock (fixes concurrent remove())
+ while (!cache.hasChild(fqn)); // keep trying until we have the lock (fixes concurrent remove())
// terminates successfully, or with (Timeout)Exception
}
else
// TODO: WHat do we do about createIfNotExists flag?
+ {
lock(fqn, ctx.getGlobalTransaction(), lock_type, recursive, createIfNotExists);
}
- else {
- if(log.isTraceEnabled())
+ }
+ else
+ {
+ if (log.isTraceEnabled())
+ {
log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking");
}
- if(m.getMethodId() == MethodDeclarations.lockMethodLocal_id)
+ }
+ if (m.getMethodId() == MethodDeclarations.lockMethodLocal_id)
+ {
return null;
+ }
return super.invoke(m);
}
-
/**
* Locks a given node.
+ *
* @param fqn
* @param gtx
* @param lock_type DataNode.LOCK_TYPE_READ, DataNode.LOCK_TYPE_WRITE or DataNode.LOCK_TYPE_NONE
* @param recursive Lock children recursively
*/
private void lock(Fqn fqn, GlobalTransaction gtx, DataNode.LockType lock_type, boolean recursive, boolean createIfNotExists)
- throws TimeoutException, LockingException, InterruptedException {
+ throws TimeoutException, LockingException, InterruptedException
+ {
Node n;
Node child_node;
Object child_name;
Thread currentThread = Thread.currentThread();
- Object owner = (gtx != null) ? (Object)gtx : currentThread;
+ Object owner = (gtx != null) ? (Object) gtx : currentThread;
int treeNodeSize;
boolean acquired;
if (log.isTraceEnabled()) log.trace("Attempting to lock node " + fqn + " for owner " + owner);
- if(fqn == null) {
+ if (fqn == null)
+ {
log.error("fqn is null - this should not be the case");
return;
}
- if((treeNodeSize=fqn.size()) == 0)
+ if ((treeNodeSize = fqn.size()) == 0)
+ {
return;
+ }
- if(configuration.getIsolationLevel() == IsolationLevel.NONE)
- lock_type=DataNode.LockType.NONE;
+ if (configuration.getIsolationLevel() == IsolationLevel.NONE)
+ {
+ lock_type = DataNode.LockType.NONE;
+ }
- n=cache;
- for(int i=0; i < treeNodeSize; i++) {
- child_name=fqn.get(i);
+ n = cache;
+ for (int i = 0; i < treeNodeSize; i++)
+ {
+ child_name = fqn.get(i);
cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
child_node = n.getChild(new Fqn(child_name));
if (child_node == null && createIfNotExists)
{
cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
- child_node= n.addChild(new Fqn(child_name));
+ child_node = n.addChild(new Fqn(child_name));
}
- if(child_node == null) {
- if(log.isTraceEnabled())
+ if (child_node == null)
+ {
+ if (log.isTraceEnabled())
+ {
log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
+ }
return;
}
- if(lock_type == DataNode.LockType.NONE) {
+ if (lock_type == DataNode.LockType.NONE)
+ {
// acquired=false;
- n=child_node;
+ n = child_node;
continue;
}
- else {
- if(lock_type == DataNode.LockType.WRITE && i == (treeNodeSize - 1)) {
+ else
+ {
+ if (lock_type == DataNode.LockType.WRITE && i == (treeNodeSize - 1))
+ {
//acquired=child_node.acquire(owner, lock_timeout, DataNode.LOCK_TYPE_WRITE);
acquired = lockManager.acquire(child_node, owner, DataNode.LockType.WRITE);
}
- else {
+ else
+ {
//acquired=child_node.acquire(owner, lock_timeout, DataNode.LOCK_TYPE_READ);
acquired = lockManager.acquire(child_node, owner, DataNode.LockType.READ);
}
}
- if(acquired) {
- if(gtx != null) {
+ if (acquired)
+ {
+ if (gtx != null)
+ {
// add the lock to the list of locks maintained for this transaction
// (needed for release of locks on commit or rollback)
cache.getTransactionTable().addLock(gtx, lockManager.getLock(child_node));
}
- else {
- IdentityLock l=lockManager.getLock(child_node);
+ else
+ {
+ IdentityLock l = lockManager.getLock(child_node);
List locks = getLocks(currentThread);
- if(!locks.contains(l))
+ if (!locks.contains(l))
+ {
locks.add(l);
}
}
+ }
- if(recursive && i == (treeNodeSize - 1)) {
+ if (recursive && i == (treeNodeSize - 1))
+ {
//Set acquired_locks=child_node.acquireAll(owner, lock_timeout, lock_type);
Set acquired_locks = lockManager.acquireAll(child_node, owner, lock_type);
- if(acquired_locks.size() > 0) {
- if(gtx != null) {
+ if (acquired_locks.size() > 0)
+ {
+ if (gtx != null)
+ {
cache.getTransactionTable().addLocks(gtx, acquired_locks);
}
- else {
+ else
+ {
List locks = getLocks(currentThread);
locks.addAll(acquired_locks);
}
}
}
- n=child_node;
+ n = child_node;
}
}
- private List getLocks(Thread currentThread) {
+ private List getLocks(Thread currentThread)
+ {
// This sort of looks like a get/put race condition, but
// since we key off the Thread, it's not
- List locks = (List)lock_table.get(currentThread);
- if (locks == null) {
+ List locks = (List) lock_table.get(currentThread);
+ if (locks == null)
+ {
locks = Collections.synchronizedList(new LinkedList());
lock_table.put(currentThread, locks);
}
@@ -286,16 +324,19 @@
private void createNodes(Fqn fqn, GlobalTransaction gtx)
{
int treeNodeSize;
- if ((treeNodeSize=fqn.size()) == 0) return;
- Node n=cache;
- for(int i=0; i < treeNodeSize; i++)
+ if ((treeNodeSize = fqn.size()) == 0) return;
+ Node n = cache;
+ for (int i = 0; i < treeNodeSize; i++)
{
- Object child_name=fqn.get(i);
+ Object child_name = fqn.get(i);
cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
- Node child_node= n.addChild(new Fqn(child_name)); //, gtx, true);
- if(child_node == null)
+ Node child_node = n.addChild(new Fqn(child_name)); //, gtx, true);
+ if (child_node == null)
{
- if(log.isTraceEnabled()) log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
+ if (log.isTraceEnabled())
+ {
+ log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
+ }
return;
}
n = child_node;
@@ -305,14 +346,19 @@
/**
* Remove all locks held by <tt>tx</tt>, remove the transaction from the transaction table
+ *
* @param gtx
*/
- private void commit(GlobalTransaction gtx) {
- if(log.isTraceEnabled())
+ private void commit(GlobalTransaction gtx)
+ {
+ if (log.isTraceEnabled())
+ {
log.trace("committing cache with gtx " + gtx);
+ }
- TransactionEntry entry=tx_table.get(gtx);
- if(entry == null) {
+ TransactionEntry entry = tx_table.get(gtx);
+ if (entry == null)
+ {
log.error("entry for transaction " + gtx + " not found (maybe already committed)");
return;
}
@@ -320,9 +366,11 @@
// Let's do it in stack style, LIFO
entry.releaseAllLocksLIFO(gtx);
- Transaction ltx=entry.getTransaction();
- if(log.isTraceEnabled())
+ Transaction ltx = entry.getTransaction();
+ if (log.isTraceEnabled())
+ {
log.trace("removing local transaction " + ltx + " and global transaction " + gtx);
+ }
tx_table.remove(ltx);
tx_table.remove(gtx);
}
@@ -339,15 +387,19 @@
*
* @param tx
*/
- private void rollback(GlobalTransaction tx) {
+ private void rollback(GlobalTransaction tx)
+ {
List undo_ops;
- TransactionEntry entry=tx_table.get(tx);
+ TransactionEntry entry = tx_table.get(tx);
MethodCall undo_op;
- if(log.isTraceEnabled())
+ if (log.isTraceEnabled())
+ {
log.trace("called to rollback cache with GlobalTransaction=" + tx);
+ }
- if(entry == null) {
+ if (entry == null)
+ {
log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)");
return;
}
@@ -370,15 +422,16 @@
// }
// }
-
// 3. Finally, release all locks held by this TX
// Let's do it in stack style, LIFO
// Note that the lock could have been released already so don't panic.
entry.releaseAllLocksLIFO(tx);
- Transaction ltx=entry.getTransaction();
- if(log.isTraceEnabled())
+ Transaction ltx = entry.getTransaction();
+ if (log.isTraceEnabled())
+ {
log.trace("removing local transaction " + ltx + " and global transaction " + tx);
+ }
tx_table.remove(ltx);
tx_table.remove(tx);
}
1.39 +118 -114 JBossCache/src/org/jboss/cache/interceptors/ReplicationInterceptor.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ReplicationInterceptor.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/interceptors/ReplicationInterceptor.java,v
retrieving revision 1.38
retrieving revision 1.39
diff -u -b -r1.38 -r1.39
--- ReplicationInterceptor.java 25 Aug 2006 14:10:07 -0000 1.38
+++ ReplicationInterceptor.java 29 Aug 2006 16:35:52 -0000 1.39
@@ -4,8 +4,8 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
-import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodDeclarations;
/**
* Takes care of replicating modifications to other nodes in a cluster. Also
@@ -13,7 +13,7 @@
* 'side-ways' (see docs/design/Refactoring.txt).
*
* @author Bela Ban
- * @version $Id: ReplicationInterceptor.java,v 1.38 2006/08/25 14:10:07 msurtani Exp $
+ * @version $Id: ReplicationInterceptor.java,v 1.39 2006/08/29 16:35:52 msurtani Exp $
*/
public class ReplicationInterceptor extends BaseRpcInterceptor
{
@@ -25,9 +25,9 @@
boolean isLocalCommitOrRollback = gtx != null && !gtx.isRemote() && (m.getMethodId() == MethodDeclarations.commitMethod_id || m.getMethodId() == MethodDeclarations.rollbackMethod_id);
-
// pass up the chain if not a local commit or rollback (in which case replicate first)
Object o = isLocalCommitOrRollback ? null : super.invoke(m);
+// ctx = cache.getInvocationContext();
Option optionOverride = ctx.getOptionOverrides();
@@ -44,7 +44,7 @@
if (gtx != null && !gtx.isRemote())
{
// lets see what sort of method we've got.
- switch(m.getMethodId())
+ switch (m.getMethodId())
{
case MethodDeclarations.commitMethod_id:
// REPL_ASYNC will result in only a prepare() method - 1 phase commit.
@@ -53,14 +53,18 @@
o = super.invoke(m);
break;
case MethodDeclarations.prepareMethod_id:
- if (containsModifications(m)) {
+ if (containsModifications(m))
+ {
// this is a prepare method
runPreparePhase(m, gtx);
}
break;
case MethodDeclarations.rollbackMethod_id:
// REPL_ASYNC will result in only a prepare() method - 1 phase commit.
- if (containsModifications(m) && !ctx.isLocalRollbackOnly()) replicateCall(m, configuration.isSyncRollbackPhase());
+ if (containsModifications(m) && !ctx.isLocalRollbackOnly())
+ {
+ replicateCall(m, configuration.isSyncRollbackPhase());
+ }
// now pass up the chain
o = super.invoke(m);
break;
More information about the jboss-cvs-commits
mailing list