Author: mircea.markus
Date: 2007-11-29 17:44:11 -0500 (Thu, 29 Nov 2007)
New Revision: 4792
Added:
core/trunk/src/main/java/org/jboss/cache/interceptors/MethodDispacherInterceptor.java
Modified:
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheMgmtInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/NotificationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/PassivationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/UnlockInterceptor.java
core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java
core/trunk/src/main/java/org/jboss/cache/marshall/MethodCall.java
core/trunk/src/main/java/org/jboss/cache/marshall/MethodDeclarations.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
Log:
refactoring of the interceptor hierachy
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2007-11-29 17:10:28
UTC (rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2007-11-29 22:44:11
UTC (rev 4792)
@@ -55,7 +55,16 @@
/**
* Data invalidated asynchronously.
*/
- INVALIDATION_ASYNC
+ INVALIDATION_ASYNC;
+
+ /**
+ * Returns true if the mode is invalidation, either sync or async.
+ */
+ public boolean isInvalidation()
+ {
+ return this.equals(INVALIDATION_SYNC) || this.equals(INVALIDATION_SYNC);
+ }
+
}
public static CacheMode legacyModeToCacheMode(int legacyMode)
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -1,16 +1,15 @@
package org.jboss.cache.interceptors;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.Modification;
-import org.jboss.cache.NodeSPI;
+import org.jboss.cache.*;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.jgroups.Address;
import javax.transaction.TransactionManager;
+import javax.transaction.SystemException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
@@ -48,134 +47,163 @@
isActivation = true;
}
- /**
- * Makes sure a node is loaded into memory before a call executes. If node is
- * already loaded and its attributes already initialized, then remove it from
- * the cache loader and notify the cache listeners that the node has been activated.
- *
- * @return
- * @throws Throwable
- */
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
- Fqn fqn = null;
- Object[] args = m.getArgs();
- Object retval;
-
- // First call the parent class to load the node
- retval = super.invoke(ctx);
-
- // is this a node removal operation?
- boolean removeData = false, nodeRemoved = false;
-
- // Could be TRANSACTIONAL. If so, we register for TX completion (if we haven't
done so yet)
- if (tx_mgr != null && tx_mgr.getTransaction() != null)
+ Object returnValue = super.handleRemoveDataMethod(ctx, tx, fqn, createUndoOps);
+ if (log.isTraceEnabled())
{
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- switch (m.getMethodId())
- {
- case MethodDeclarations.commitMethod_id:
- if (hasModifications(args))
- {
- loader.commit(gtx);
- if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
- {
- Integer acts = (Integer) m_txActivations.get(gtx);
- if (acts != null)
- {
- m_activations = m_activations + acts;
- }
- m_txActivations.remove(gtx);
- }
- }
- break;
- case MethodDeclarations.rollbackMethod_id:
- if (hasModifications(args))
- {
- loader.rollback(gtx);
- if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
- {
- m_txActivations.remove(gtx);
- }
- }
- break;
- case MethodDeclarations.optimisticPrepareMethod_id:
- case MethodDeclarations.prepareMethod_id:
- prepareCacheLoader(ctx);
- break;
- }
+ log.trace("This is a remove data operation; removing the data from the
loader, no activation processing needed.");
}
+ loader.removeData(fqn);
+ return returnValue;
+ }
- // CacheLoaderInterceptor normally doesn't load the node
- // since CacheStoreInterceptor.put() returns the old value
- switch (m.getMethodId())
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ Object returnValue = super.handleRemoveNodeMethod(ctx, tx, fqn, createUndoOps);
+ if (log.isTraceEnabled())
{
- case MethodDeclarations.putForExternalReadMethodLocal_id:
- case MethodDeclarations.putDataMethodLocal_id:
- case MethodDeclarations.putDataEraseMethodLocal_id:
- case MethodDeclarations.putKeyValMethodLocal_id:
- case MethodDeclarations.removeKeyMethodLocal_id:
- case MethodDeclarations.addChildMethodLocal_id:
- fqn = (Fqn) args[1];
- break;
- case MethodDeclarations.getKeyValueMethodLocal_id:
- case MethodDeclarations.getNodeMethodLocal_id:
- case MethodDeclarations.getKeysMethodLocal_id:
- case MethodDeclarations.getChildrenNamesMethodLocal_id:
- case MethodDeclarations.releaseAllLocksMethodLocal_id:
- case MethodDeclarations.printMethodLocal_id:
- fqn = (Fqn) args[0];
- break;
- case MethodDeclarations.removeNodeMethodLocal_id:
- nodeRemoved = true;
- fqn = (Fqn) args[1];
- break;
- case MethodDeclarations.removeDataMethodLocal_id:
- removeData = true;
- fqn = (Fqn) args[1];
- break;
+ log.trace("This is a remove operation; removing the node from the loader,
no activation processing needed.");
}
+ loader.remove(fqn);
+ return returnValue;
+ }
+ protected Object handlePrintMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ Object returnValue = super.handlePrintMethod(ctx, fqn);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handleReleaseAllLocksMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ Object returnValue = super.handleReleaseAllLocksMethod(ctx, fqn);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handleGetChildrenNamesMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ Object returnValue = super.handleGetChildrenNamesMethod(ctx, fqn);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handleGetKeysMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ Object returnValue = super.handleGetKeysMethod(ctx, fqn);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ Object returnValue = super.handleGetNodeMethod(ctx, fqn);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
+ {
+ Object returnValue = super.handleGetKeyValueMethod(ctx, fqn, key, sendNodeEvent);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handleAddChildMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
parentFqn, Object childName, Node cn, boolean createUndoOps) throws Throwable
+ {
+ Object returnValue = super.handleAddChildMethod(ctx, tx, parentFqn, childName, cn,
createUndoOps);
+ removeNodeFromCacheLoader(ctx, parentFqn);
+ return returnValue;
+ }
+
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ Object returnValue = super.handlePutForExternalReadMethod(ctx, tx, fqn, key,
value);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ Object returnValue = super.handlePutDataMethod(ctx, tx, fqn, data, createUndoOps);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ Object returnValue = super.handlePutKeyValueMethod(ctx, gtx, fqn, key, value,
createUndoOps);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ Object returnValue = super.handleRemoveKeyMethod(ctx, tx, fqn, key,
createUndoOps);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ Object returnValue = super.handlePutDataEraseMethod(ctx, gt, fqn, newData,
createUndoOps, eraseContents);
+ removeNodeFromCacheLoader(ctx, fqn);
+ return returnValue;
+ }
+
+ /**
+ * Remove the node from the cache loader if it exists in memory,
+ * its attributes have been initialized, its children have been loaded,
+ * AND it was found in the cache loader (nodeLoaded = true).
+ * Then notify the listeners that the node has been activated.
+ */
+ private void removeNodeFromCacheLoader(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
NodeSPI n;
- if (fqn != null)
+ if (((n = getNode(fqn)) != null) && n.isDataLoaded() &&
loader.exists(fqn))
{
- if (nodeRemoved)
+ // node not null and attributes have been loaded?
+ if (!n.getChildrenDirect().isEmpty())
{
- log.trace("This is a remove operation; removing the node from the
loader, no activation processing needed.");
- loader.remove(fqn);
- }
- else if (removeData)
- {
- log.trace("This is a remove data operation; removing the data from the
loader, no activation processing needed.");
- loader.removeData(fqn);
- }
- else if (((n = getNode(fqn)) != null) && n.isDataLoaded() &&
loader.exists(fqn))
- {
- // Remove the node from the cache loader if it exists in memory,
- // its attributes have been initialized, its children have been loaded,
- // AND it was found in the cache loader (nodeLoaded = true).
- // Then notify the listeners that the node has been activated.
-
- // node not null and attributes have been loaded?
- if (!n.getChildrenDirect().isEmpty())
+ if (allInitialized(n))
{
- if (allInitialized(n))
- {
- log.debug("children all initialized");
- remove(ctx, fqn);
- }
- }
- else if (loaderNoChildren(fqn))
- {
- if (log.isDebugEnabled()) log.debug("no children " + n);
+ log.debug("children all initialized");
remove(ctx, fqn);
}
+ } else if (loaderNoChildren(fqn))
+ {
+ if (log.isDebugEnabled()) log.debug("no children " + n);
+ remove(ctx, fqn);
}
}
+ }
+
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ if (inTransaction()) {
+ prepareCacheLoader(ctx);
+ }
return retval;
}
+ private boolean inTransaction()
+ throws SystemException
+ {
+ return tx_mgr != null && tx_mgr.getTransaction() != null;
+ }
+
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ if (inTransaction()) {
+ prepareCacheLoader(ctx);
+ }
+ return retval;
+ }
+
private void remove(InvocationContext ctx, Fqn fqn) throws Exception
{
cache.getNotifier().notifyNodeActivated(fqn, true, Collections.emptyMap(), ctx);
@@ -195,7 +223,6 @@
{
return false;
}
-
for (NodeSPI child : n.getChildrenDirect())
{
if (!child.isDataLoaded())
@@ -204,6 +231,7 @@
}
}
return true;
+
}
/**
@@ -246,17 +274,6 @@
return retval;
}
- protected boolean hasModifications(Object[] args)
- {
- int hint = 1;
- if (args[hint] instanceof Boolean) return (Boolean) args[hint];
- for (Object arg : args)
- {
- if (arg instanceof Boolean) return (Boolean) arg;
- }
- return false;
- }
-
private void prepareCacheLoader(InvocationContext ctx) throws Exception
{
List<MethodCall> modifications;
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -24,7 +24,7 @@
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
*/
-public abstract class BaseRpcInterceptor extends Interceptor
+public abstract class BaseRpcInterceptor extends MethodDispacherInterceptor
{
private BuddyManager buddyManager;
@@ -43,9 +43,6 @@
/**
* Checks whether any of the responses are exceptions. If yes, re-throws
* them (as exceptions or runtime exceptions).
- *
- * @param rsps
- * @throws Throwable
*/
protected void checkResponses(List rsps) throws Throwable
{
@@ -124,6 +121,7 @@
cache.getRPCManager().getReplicationQueue().add(MethodCallFactory.create(MethodDeclarations.replicateMethod,
call));
}
+ //todo info expt for this is InvocationContext, move method there
protected boolean containsModifications(InvocationContext ctx)
{
switch (ctx.getMethodCall().getMethodId())
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -17,7 +17,7 @@
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
*/
-public abstract class BaseTransactionalContextInterceptor extends Interceptor
+public abstract class BaseTransactionalContextInterceptor extends
MethodDispacherInterceptor
{
protected TransactionTable txTable;
protected TransactionManager txManager;
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -1,10 +1,6 @@
package org.jboss.cache.interceptors;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.NodeSPI;
+import org.jboss.cache.*;
import static org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.lock.NodeLock;
@@ -14,6 +10,7 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.apache.commons.logging.Log;
import java.util.Collections;
import java.util.HashMap;
@@ -28,7 +25,7 @@
* @author Bela Ban
* @version $Id$
*/
-public class CacheLoaderInterceptor extends Interceptor implements
CacheLoaderInterceptorMBean
+public class CacheLoaderInterceptor extends MethodDispacherInterceptor implements
CacheLoaderInterceptorMBean
{
private long m_cacheLoads = 0;
private long m_cacheMisses = 0;
@@ -44,143 +41,198 @@
*/
protected boolean useCacheStore = true;
+ protected Log getLog()
+ {
+ return log;
+ }
+
public void setCache(CacheSPI cache)
{
super.setCache(cache);
txTable = cache.getTransactionTable();
this.loader = cache.getCacheLoaderManager().getCacheLoader();
CacheMode mode = cache.getConfiguration().getCacheMode();
- usingOptimisticInvalidation = cache.getConfiguration().isNodeLockingOptimistic()
&&
- ((mode == CacheMode.INVALIDATION_ASYNC) || (mode ==
CacheMode.INVALIDATION_SYNC));
+ usingOptimisticInvalidation = cache.getConfiguration().isNodeLockingOptimistic()
&& mode.isInvalidation();
}
- /**
- * Makes sure a node is loaded into memory before a call executes (no-op if node is
already loaded). If attributes
- * of a node are to be accessed by the method, the attributes are also loaded.
- *
- * @return
- * @throws Throwable
- */
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
- Fqn fqn = null, fqn2 = null;// if set, load the data. fqn2 for 2nd fqn in move().
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, false, true, false, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
- Object[] args = m.getArgs();
- boolean acquireLock = false;// do we need to acquire a lock if we load this node
from cloader?
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, false, true, false, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
- boolean initNode = false;// keep uninitialized
- Object key = null;
- TransactionEntry entry = null;
- GlobalTransaction gtx;
- boolean recursive = false;// do we also load children?
- boolean bypassLoadingData = false;
- boolean allKeys = false; // is method a getter asking for all data keys?
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, key, false, useCacheStore, !useCacheStore,
ctx.getMethodCall(), getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
- if ((gtx = ctx.getGlobalTransaction()) != null)
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ if (fqn != null)
{
- entry = txTable.get(gtx);
+ loadIfNeeded(ctx, fqn, key, false, useCacheStore, !useCacheStore,
ctx.getMethodCall(), getTransactionEntry(ctx), false, false, false);
}
+ return nextInterceptor(ctx);
+ }
- if (log.isTraceEnabled())
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
+ if (from != null)
{
- log.trace("invoke " + m);
+ if (to != null)
+ {
+ loadIfNeeded(ctx, to, null, false, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), false, true, false);
+ }
+ loadIfNeeded(ctx, from, null, false, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), true, true, false);
}
- switch (m.getMethodId())
+
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleAddChildMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
parentFqn, Object childName, Node cn, boolean createUndoOps) throws Throwable
+ {
+
+ if (parentFqn != null)
{
- case MethodDeclarations.putDataEraseMethodLocal_id:
- case MethodDeclarations.putDataMethodLocal_id:
- fqn = (Fqn) args[1];
- initNode = true;
- break;
- case MethodDeclarations.putForExternalReadMethodLocal_id:
- case MethodDeclarations.putKeyValMethodLocal_id:
- fqn = (Fqn) args[1];
- key = args[2];
- if (useCacheStore)
+ loadIfNeeded(ctx, parentFqn, null, false, false, false, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, key, false, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, false, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, !usingOptimisticInvalidation);
+ }
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleGetChildrenNamesMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, false, false, false, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, true);
+ }
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleReleaseAllLocksMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, false, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handlePrintMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, false, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleGetKeysMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, true, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleGetDataMapMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ if (fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, true, false, true, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ // clean up nodesCreated map
+ boolean traceEnabled = log.isTraceEnabled();
+ log.trace("Removing temporarily created nodes from treecache");
+
+ // this needs to be done in reverse order.
+ List list = getTransactionEntry(ctx).getDummyNodesCreatedByCacheLoader();
+ if (list != null && list.size() > 0)
+ {
+ ListIterator i = list.listIterator(list.size());
+ while (i.hasPrevious())
+ {
+ Fqn fqn = (Fqn) i.previous();
+ try
{
- initNode = true;
+ cache.evict(fqn, false);
}
- else
+ catch (CacheException e)
{
- acquireLock = true;
+ if (traceEnabled) log.trace("Unable to evict node " + fqn, e);
}
- break;
- case MethodDeclarations.moveMethodLocal_id:
- fqn = (Fqn) args[0];
- fqn2 = (Fqn) args[1];
- acquireLock = true;
- //initNode = true;
- recursive = true;
- break;
- case MethodDeclarations.addChildMethodLocal_id:
- fqn = (Fqn) args[1];
- break;
- case MethodDeclarations.getKeyValueMethodLocal_id:
- fqn = (Fqn) args[0];
- key = args[1];
- acquireLock = true;
- break;
- case MethodDeclarations.getNodeMethodLocal_id:
- bypassLoadingData = !usingOptimisticInvalidation;
- fqn = (Fqn) args[0];
- acquireLock = true;
- break;
- case MethodDeclarations.getChildrenNamesMethodLocal_id:
- bypassLoadingData = true;
- case MethodDeclarations.releaseAllLocksMethodLocal_id:
- case MethodDeclarations.printMethodLocal_id:
- fqn = (Fqn) args[0];
- acquireLock = true;
- break;
- case MethodDeclarations.getKeysMethodLocal_id:
- case MethodDeclarations.getDataMapMethodLocal_id:
- allKeys = true;
- fqn = (Fqn) args[0];
- acquireLock = true;
- break;
- case MethodDeclarations.rollbackMethod_id:
- // clean up nodesCreated map
- cleanupNodesCreated(entry);
- break;
- case MethodDeclarations.removeNodeMethodLocal_id:
- if (cache.getConfiguration().isNodeLockingOptimistic())
- {
- fqn = (Fqn) args[1];
- }
- break;
- default:
- if (!useCacheStore)
- {
- if (m.getMethodId() == MethodDeclarations.removeKeyMethodLocal_id)
- {
- fqn = (Fqn) args[1];
- }
- else if (m.getMethodId() == MethodDeclarations.removeDataMethodLocal_id)
- {
- fqn = (Fqn) args[1];
- initNode = true;
- }
- }
- break;
+ }
}
+ return nextInterceptor(ctx);
+ }
- /* On the way in: load elements into cache from the CacheLoader if not yet in the
cache. We need to synchronize
- this so only 1 thread attempts to load a given element */
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ if (cache.getConfiguration().isNodeLockingOptimistic() && fqn != null)
+ {
+ loadIfNeeded(ctx, fqn, null, false, false, false, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
+ }
- if (fqn != null)
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ if (fqn != null && !useCacheStore)
{
- if (fqn2 != null)
- {
- loadIfNeeded(ctx, fqn2, key, allKeys, initNode, acquireLock, m, entry, false,
m.getMethodId() == MethodDeclarations.moveMethodLocal_id, bypassLoadingData);
- }
- loadIfNeeded(ctx, fqn, key, allKeys, initNode, acquireLock, m, entry, recursive,
m.getMethodId() == MethodDeclarations.moveMethodLocal_id, bypassLoadingData);
+ loadIfNeeded(ctx, fqn, key, false, false, false, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
}
+ return nextInterceptor(ctx);
+ }
- return super.invoke(ctx);
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ if (fqn != null && !useCacheStore)
+ {
+ loadIfNeeded(ctx, fqn, null, false, true, false, ctx.getMethodCall(),
getTransactionEntry(ctx), false, false, false);
+ }
+ return nextInterceptor(ctx);
}
-
private void loadIfNeeded(InvocationContext ctx, Fqn fqn, Object key, boolean allKeys,
boolean initNode, boolean acquireLock, MethodCall m, TransactionEntry entry, boolean
recursive, boolean isMove, boolean bypassLoadingData) throws Throwable
{
NodeSPI n = cache.peek(fqn, true, true);
@@ -380,7 +432,7 @@
// hacky
cache.getInterceptorChain().get(0).invoke(InvocationContext.fromMethodCall(m));
-// super.invoke(m);
+// nextInterceptor(m);
}
/**
@@ -391,22 +443,18 @@
protected NodeSPI getNode(Fqn fqn)
{
return cache.peek(fqn, true);
- // int treeNodeSize = fqn.size();
- //
- // // root node
- // Node n = cache.getRoot();
- // Node child_node;
- // Object child_name;
- // for (int i = 0; i < treeNodeSize && n != null; i++)
- // {
- // child_name = fqn.get(i);
- //
cache.getInvocationContext().getOptionOverrides().setBypassInterceptorChain(true);
- // child_node = n.getChild(new Fqn(child_name));
- // n = child_node;
- // }
- // return n;
}
+ private TransactionEntry getTransactionEntry(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ if (gtx != null)
+ {
+ return txTable.get(gtx);
+ }
+ return null;
+ }
+
/**
* Returns true if the FQN or parent was removed during the current
* transaction.
@@ -534,32 +582,7 @@
return (NodeSPI) children.get(child_name);
}
- private void cleanupNodesCreated(TransactionEntry entry)
- {
- boolean traceEnabled = log.isTraceEnabled();
- log.trace("Removing temporarily created nodes from treecache");
- // this needs to be done in reverse order.
- List list = entry.getDummyNodesCreatedByCacheLoader();
- if (list != null && list.size() > 0)
- {
- ListIterator i = list.listIterator(list.size());
- while (i.hasPrevious())
- {
- Fqn fqn = (Fqn) i.previous();
- try
- {
- cache.evict(fqn, false);
- }
- catch (CacheException e)
- {
- if (traceEnabled) log.trace("Unable to evict node " + fqn, e);
- }
- }
- }
- }
-
-
private Map loadData(Fqn fqn) throws Exception
{
@@ -578,28 +601,6 @@
m_cacheMisses++;
}
}
-
- // BES Jan-4-2007 Stop doing this; it's annoying and people
- // should have converted by now
- // if (!nodeExists && isCustomCacheLoader)
- // {
- // warnCustom();
- // }
-
- // BES Jan-21-2007 Do the notifications in loadNode, before and after
- // we create nodes
-// if (nodeExists)
-// {
-// cache.getNotifier().notifyNodeLoaded(fqn, true, Collections.emptyMap(),
true);
-// cache.getNotifier().notifyNodeLoaded(fqn, false, nodeData, true);
-//
-// if (isActivation)
-// {
-// cache.getNotifier().notifyNodeActivated(fqn, true, true);
-// cache.getNotifier().notifyNodeActivated(fqn, false, true);
-// }
-// }
-
return nodeData;
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheMgmtInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheMgmtInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheMgmtInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -23,8 +23,10 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.apache.commons.logging.Log;
import java.util.HashMap;
import java.util.Map;
@@ -35,9 +37,7 @@
* @author Jerry Gauthier
* @version $Id$
*/
-public class CacheMgmtInterceptor
- extends Interceptor
- implements CacheMgmtInterceptorMBean
+public class CacheMgmtInterceptor extends MethodDispacherInterceptor implements
CacheMgmtInterceptorMBean
{
private long m_hit_times = 0;
private long m_miss_times = 0;
@@ -54,76 +54,90 @@
super.setCache(cache);
}
- /**
- * Pass the method on and capture cache statistics
- *
- * @return
- * @throws Throwable
- */
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Log getLog()
{
- MethodCall m = ctx.getMethodCall();
- Map attributes;
- Object[] args = m.getArgs();
- Object retval;
+ return log;
+ }
- // if statistics not enabled, execute the method and return
- if (!getStatisticsEnabled())
- return super.invoke(ctx);
+ protected boolean skipMethodCall(InvocationContext ctx)
+ {
+ return !getStatisticsEnabled();
+ }
- long t1, t2;
- switch (m.getMethodId())
+ protected Object handleEvictMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ Object returnValue = nextInterceptor(ctx);
+ m_evictions++;
+ return returnValue;
+ }
+
+ protected Object handleEvictVersionedNodeMethod(InvocationContext ctx, Fqn fqn,
DataVersion dataVersion) throws Throwable
+ {
+ Object returnValue = nextInterceptor(ctx);
+ m_evictions++;
+ return returnValue;
+ }
+
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
+ {
+ long t1 = System.currentTimeMillis();
+ Object retval = super.nextInterceptor(ctx);
+ long t2 = System.currentTimeMillis();
+ if (retval == null)
{
- case MethodDeclarations.getKeyValueMethodLocal_id:
- //fqn = (Fqn) args[0];
- //key = args[1];
- t1 = System.currentTimeMillis();
- retval = super.invoke(ctx);
- t2 = System.currentTimeMillis();
- if (retval == null)
- {
- m_miss_times = m_miss_times + (t2 - t1);
- m_misses++;
- }
- else
- {
- m_hit_times = m_hit_times + (t2 - t1);
- m_hits++;
- }
- break;
- case MethodDeclarations.putForExternalReadMethodLocal_id:
- case MethodDeclarations.putKeyValMethodLocal_id:
- t1 = System.currentTimeMillis();
- retval = super.invoke(ctx);
- t2 = System.currentTimeMillis();
- m_store_times = m_store_times + (t2 - t1);
- m_stores++;
- break;
- case MethodDeclarations.putDataMethodLocal_id:
- case MethodDeclarations.putDataEraseMethodLocal_id:
- //fqn = (Fqn) args[1];
- attributes = (Map) args[2];
- t1 = System.currentTimeMillis();
- retval = super.invoke(ctx);
- t2 = System.currentTimeMillis();
+ m_miss_times = m_miss_times + (t2 - t1);
+ m_misses++;
+ } else
+ {
+ m_hit_times = m_hit_times + (t2 - t1);
+ m_hits++;
+ }
+ return retval;
+ }
- if (attributes != null && attributes.size() > 0)
- {
- m_store_times = m_store_times + (t2 - t1);
- m_stores = m_stores + attributes.size();
- }
- break;
- case MethodDeclarations.evictNodeMethodLocal_id:
- case MethodDeclarations.evictVersionedNodeMethodLocal_id:
- //fqn = (Fqn) args[0];
- retval = super.invoke(ctx);
- m_evictions++;
- break;
- default:
- retval = super.invoke(ctx);
- break;
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ return handlePutData(ctx, data);
+ }
+
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ return handlePutData(ctx, newData);
+ }
+
+ private Object handlePutData(InvocationContext ctx, Map data)
+ throws Throwable
+ {
+ long t1 = System.currentTimeMillis();
+ Object retval = nextInterceptor(ctx);
+ long t2 = System.currentTimeMillis();
+
+ if (data != null && data.size() > 0)
+ {
+ m_store_times = m_store_times + (t2 - t1);
+ m_stores = m_stores + data.size();
}
+ return retval;
+ }
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ return handlePutExternalAndKeyValue(ctx);
+ }
+
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ return handlePutExternalAndKeyValue(ctx);
+ }
+
+ private Object handlePutExternalAndKeyValue(InvocationContext ctx)
+ throws Throwable
+ {
+ long t1 = System.currentTimeMillis();
+ Object retval = nextInterceptor(ctx);
+ long t2 = System.currentTimeMillis();
+ m_store_times = m_store_times + (t2 - t1);
+ m_stores++;
return retval;
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -13,8 +13,11 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.jgroups.Address;
+import org.apache.commons.logging.Log;
import javax.transaction.TransactionManager;
+import javax.transaction.SystemException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
@@ -31,7 +34,7 @@
* @author Bela Ban
* @version $Id$
*/
-public class CacheStoreInterceptor extends Interceptor implements
CacheStoreInterceptorMBean
+public class CacheStoreInterceptor extends MethodDispacherInterceptor implements
CacheStoreInterceptorMBean
{
protected CacheLoaderConfig loaderConfig = null;
@@ -42,6 +45,11 @@
private long m_cacheStores = 0;
protected CacheLoader loader;
+ protected Log getLog()
+ {
+ return log;
+ }
+
public void setCache(CacheSPI cache)
{
super.setCache(cache);
@@ -52,190 +60,231 @@
}
/**
- * Pass the method on. When it returns, store the modification back to the store using
the CacheLoader.
- * In case of a transaction, register for TX completion (2PC) and at TX commit, write
modifications made
- * under the given TX to the CacheLoader
- *
- * @return
- * @throws Throwable
+ * if this is a shared cache loader and the call is of remote origin, pass up the
chain
*/
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected boolean skipMethodCall(InvocationContext ctx)
{
-
- MethodCall m = ctx.getMethodCall();
- // if this is a shared cache loader and the call is of remote origin, pass up the
chain. - Manik
- // see
http://www.jboss.com/index.html?module=bb&op=viewtopic&t=76090
-
if (!ctx.isOriginLocal() && loaderConfig.isShared())
{
- log.trace("Passing up method call and bypassing this interceptor since the
cache loader is shared and this call originated remotely.");
- return super.invoke(ctx);
+ if (log.isTraceEnabled())
+ {
+ log.trace("Passing up method call and bypassing this interceptor since
the cache loader is shared and this call originated remotely.");
+ }
+ return true;
}
+ return false;
+ }
- Fqn fqn;
- Object key, value;
- Object[] args = m.getArgs();
- Object retval, tmp_retval = null;
- boolean use_tmp_retval = false;
-
-
- if (log.isTraceEnabled())
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
+ {
+ if (inTransaction())
{
- log.trace("invoke " + m);
- }
-
- if (tx_mgr != null && tx_mgr.getTransaction() != null)
- {
- // we have a tx running.
log.trace("transactional so don't put stuff in the cloader
yet.");
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- switch (m.getMethodId())
+ if (ctx.isTxHasMods())
{
- case MethodDeclarations.commitMethod_id:
- if (ctx.isTxHasMods())
+ // this is a commit call.
+ if (log.isTraceEnabled()) log.trace("Calling loader.commit() for gtx
" + gtx);
+ // sync call (a write) on the loader
+ // ignore modified FQNs
+ // List fqnsModified =
getFqnsFromModificationList(tx_table.get(gtx).getCacheLoaderModifications());
+ try
+ {
+ loader.commit(gtx);
+ }
+ catch (Throwable t)
+ {
+ preparingTxs.remove(gtx);
+ throw t;
+ }
+ if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
+ {
+ Integer puts = (Integer) m_txStores.get(gtx);
+ if (puts != null)
{
- // this is a commit call.
- if (log.isTraceEnabled()) log.trace("Calling loader.commit() for
gtx " + gtx);
- // sync call (a write) on the loader
- // ignore modified FQNs
- // List fqnsModified =
getFqnsFromModificationList(tx_table.get(gtx).getCacheLoaderModifications());
- try
- {
- loader.commit(gtx);
- }
- catch (Throwable t)
- {
- preparingTxs.remove(gtx);
- throw t;
- }
- if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
- {
- Integer puts = (Integer) m_txStores.get(gtx);
- if (puts != null)
- {
- m_cacheStores = m_cacheStores + puts;
- }
- m_txStores.remove(gtx);
- }
+ m_cacheStores = m_cacheStores + puts;
}
- else
- {
- log.trace("Commit called with no modifications; ignoring.");
- }
- break;
- case MethodDeclarations.rollbackMethod_id:
- if (ctx.isTxHasMods())
- {
- // this is a rollback method
- if (preparingTxs.containsKey(gtx))
- {
- preparingTxs.remove(gtx);
- loader.rollback(gtx);
- }
- if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
- {
- m_txStores.remove(gtx);
- }
- }
- else
- {
- log.trace("Rollback called with no modifications;
ignoring.");
- }
- break;
- case MethodDeclarations.optimisticPrepareMethod_id:
- case MethodDeclarations.prepareMethod_id:
- prepareCacheLoader(gtx, isOnePhaseCommitPrepareMehod(m));
- break;
+ m_txStores.remove(gtx);
+ }
+ Object returnValue = nextInterceptor(ctx);
+ // persist additional internal state, if any, and then clean up internal
resources
+ Set<Fqn> affectedFqns = preparingTxs.remove(gtx);
+ if (affectedFqns != null)
+ {
+ storeInternalState(affectedFqns);
+ }
+ return returnValue;
+ } else
+ {
+ log.trace("Commit called with no modifications; ignoring.");
}
+ }
+ return nextInterceptor(ctx);
+ }
- // pass up the chain
- retval = super.invoke(ctx);
-
- if (m.getMethodId() == MethodDeclarations.commitMethod_id &&
ctx.isTxHasMods())
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
+ {
+ if (inTransaction())
+ {
+ log.trace("transactional so don't put stuff in the cloader
yet.");
+ if (ctx.isTxHasMods())
{
- // persist additional internal state, if any, and then clean up internal
resources
- Set<Fqn> affectedFqns = preparingTxs.remove(gtx);
- if (affectedFqns != null) storeInternalState(affectedFqns);
+ // this is a rollback method
+ if (preparingTxs.containsKey(gtx))
+ {
+ preparingTxs.remove(gtx);
+ loader.rollback(gtx);
+ }
+ if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
+ {
+ m_txStores.remove(gtx);
+ }
}
- return retval;
+ else
+ {
+ log.trace("Rollback called with no modifications; ignoring.");
+ }
}
+ return nextInterceptor(ctx);
+ }
- // if we're here we don't run in a transaction
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ if (inTransaction())
+ {
+ log.trace("transactional so don't put stuff in the cloader
yet.");
+ prepareCacheLoader(gtx, ctx.getMethodCall().isOnePhaseCommitPrepareMehod());
+ }
+ return nextInterceptor(ctx);
+ }
- // remove() methods need to be applied to the CacheLoader before passing up the
call: a listener might
- // access an element just removed, causing the CacheLoader to *load* the element
before *removing* it.
- // synchronized(this) {
- switch (m.getMethodId())
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ if (inTransaction())
{
- case MethodDeclarations.removeNodeMethodLocal_id:
- fqn = (Fqn) args[1];
- loader.remove(fqn);
- break;
- case MethodDeclarations.removeKeyMethodLocal_id:
- fqn = (Fqn) args[1];
- key = args[2];
- tmp_retval = loader.remove(fqn, key);
- use_tmp_retval = true;
- break;
- case MethodDeclarations.removeDataMethodLocal_id:
- fqn = (Fqn) args[1];
- loader.removeData(fqn);
- // we need to mark this node as data loaded
- NodeSPI n = cache.peek(fqn, false);
- if (n != null) n.setDataLoaded(true);
- break;
+ log.trace("transactional so don't put stuff in the cloader
yet.");
+ prepareCacheLoader(gtx, ctx.getMethodCall().isOnePhaseCommitPrepareMehod());
}
- // }
+ return nextInterceptor(ctx);
+ }
- retval = super.invoke(ctx);
+ /**
+ * remove() methods need to be applied to the CacheLoader before passing up the call:
a listener might
+ * access an element just removed, causing the CacheLoader to *load* the element
before *removing* it.
+ */
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ if (!inTransaction())
+ {
+ loader.remove(fqn);
+ }
+ return nextInterceptor(ctx);
+ }
- // put() methods need to be applied *after* the call
- // synchronized(this) {
- switch (m.getMethodId())
+ /**
+ * @see #handleRemoveNodeMethod(org.jboss.cache.InvocationContext,
org.jboss.cache.transaction.GlobalTransaction, org.jboss.cache.Fqn, boolean)
+ */
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ if (!inTransaction())
{
- case MethodDeclarations.moveMethodLocal_id:
- doMove((Fqn) args[0], (Fqn) args[1]);
- break;
- case MethodDeclarations.putDataEraseMethodLocal_id:
- // aside from this removeData call, this is the same as a putData call.
Needed since CacheLoader.put(Map)
- // does not overwrite but append.
- fqn = (Fqn) args[1];
- loader.removeData(fqn);
- // if we are erasing all the data then consider this node loaded
- NodeSPI n = cache.peek(fqn, false);
+ loader.remove(fqn, key);
+ Object returnValue = loader.remove(fqn, key);
+ nextInterceptor(ctx);
+ return returnValue;
+ }
+ return nextInterceptor(ctx);
+ }
+
+ /**
+ * @see #handleRemoveNodeMethod(org.jboss.cache.InvocationContext,
org.jboss.cache.transaction.GlobalTransaction, org.jboss.cache.Fqn, boolean)
+ */
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ if (!inTransaction())
+ {
+ loader.removeData(fqn);
+ // we need to mark this node as data loaded
+ NodeSPI n = cache.peek(fqn, false);
+ if (n != null)
+ {
n.setDataLoaded(true);
- case MethodDeclarations.putDataMethodLocal_id:
- loader.put((Fqn) args[1], (Map) args[2]);
- if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
- {
- m_cacheStores++;
- }
- break;
- case MethodDeclarations.putForExternalReadMethodLocal_id:
- case MethodDeclarations.putKeyValMethodLocal_id:
- fqn = (Fqn) args[1];
- key = args[2];
- value = args[3];
- tmp_retval = loader.put(fqn, key, value);
- use_tmp_retval = true;
- if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
- {
- m_cacheStores++;
- }
- break;
+ }
}
- // }
+ return nextInterceptor(ctx);
+ }
- if (use_tmp_retval)
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
+ Object returnValue = nextInterceptor(ctx);
+ if (inTransaction())
{
- return tmp_retval;
+ return returnValue;
}
- else
+ Fqn newNodeFqn = new Fqn(to, from.getLastElement());
+ recursiveMove(from, newNodeFqn);
+ loader.remove(from);
+ return returnValue;
+ }
+
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ Object returnValue = nextInterceptor(ctx);
+ if (inTransaction()) {
+ return returnValue;
+ }
+ loader.removeData(fqn);
+ // if we are erasing all the data then consider this node loaded
+ NodeSPI n = cache.peek(fqn, false);
+ n.setDataLoaded(true);
+ return returnValue;
+ }
+
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ Object returnValue = nextInterceptor(ctx);
+ if (inTransaction())
{
- return retval;
+ return returnValue;
}
+ loader.put(fqn, data);
+ if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
+ {
+ m_cacheStores++;
+ }
+ return returnValue;
}
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ return handlePutKeyValue(ctx, fqn, key, value);
+ }
+
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ return handlePutKeyValue(ctx, fqn, key, value);
+ }
+
+ private Object handlePutKeyValue(InvocationContext ctx, Fqn fqn, Object key, Object
value)
+ throws Throwable
+ {
+ Object returnValue = nextInterceptor(ctx);
+ if (inTransaction())
+ {
+ return returnValue;
+ }
+ returnValue = loader.put(fqn, key, value);
+ if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
+ {
+ m_cacheStores++;
+ }
+ return returnValue;
+ }
+
+ private boolean inTransaction() throws SystemException
+ {
+ return tx_mgr != null && tx_mgr.getTransaction() != null;
+ }
+
private void storeInternalState(Set<Fqn> affectedFqns) throws Exception
{
if (cache.getConfiguration().isNodeLockingOptimistic())
@@ -253,14 +302,6 @@
}
}
- private void doMove(Fqn node, Fqn parent) throws Exception
- {
- Fqn newNodeFqn = new Fqn(parent, node.getLastElement());
-
- recursiveMove(node, newNodeFqn);
- loader.remove(node);
- }
-
private void recursiveMove(Fqn fqn, Fqn newFqn) throws Exception
{
List fqns = new ArrayList();
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -45,7 +45,7 @@
if (log.isTraceEnabled()) log.trace("Passing up method " + m + "
so it gets invoked on cache.");
try
{
- //retval = super.invoke(m);
+ //retval = nextInterceptor(m);
retval = m.invoke(cache);
}
catch (Throwable t)
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -8,12 +8,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.Node;
-import org.jboss.cache.NodeSPI;
+import org.jboss.cache.*;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.buddyreplication.GravitateResult;
import org.jboss.cache.config.Configuration;
@@ -28,14 +23,12 @@
import org.jgroups.blocks.RspFilter;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
- * The Data Gravitator interceptor intercepts cache misses and attempts t
+ * The Data Gravitator interceptor intercepts cache misses and attempts to
* gravitate data from other parts of the cluster.
* <p/>
* Only used if Buddy Replication is enabled. Also, the interceptor only kicks
@@ -63,108 +56,146 @@
syncCommunications = configuration.getCacheMode() ==
Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() ==
Configuration.CacheMode.INVALIDATION_SYNC;
}
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Log getLog()
{
- MethodCall m = ctx.getMethodCall();
- if (log.isTraceEnabled()) log.trace("Invoked with method call " + m);
+ return log;
+ }
- if (MethodDeclarations.isBlockUnblockMethod(m.getMethodId()) ||
ctx.getOptionOverrides().isSkipDataGravitation())
+ protected boolean skipMethodCall(InvocationContext ctx)
+ {
+ return MethodDeclarations.isBlockUnblockMethod(ctx.getMethodCall().getMethodId())
||
+ ctx.getOptionOverrides().isSkipDataGravitation();
+ }
+
+ protected Object handleGetChildrenNamesMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return handleGetMethod(ctx, fqn);
+ }
+
+ protected Object handleGetDataMapMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return handleGetMethod(ctx, fqn);
+ }
+
+ protected Object handleExistsMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleGetMethod(ctx, fqn);
+ }
+
+ protected Object handleGetKeysMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleGetMethod(ctx, fqn);
+ }
+
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
+ {
+ return handleGetMethod(ctx, fqn);
+ }
+
+ protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleGetMethod(ctx, fqn);
+ }
+
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ try
{
- return super.invoke(ctx);
+ Object returnValue = nextInterceptor(ctx);
+ doPrepare(ctx.getGlobalTransaction(), ctx);
+ return returnValue;
+ } catch (Throwable throwable)
+ {
+ transactionMods.remove(ctx.getGlobalTransaction());
+ throw throwable;
+
}
+ }
- // Transactional lifecycle methods should be handled regardless of whether data
gravitation is enabled or not.
- if (!MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ return handlePrepareMethod(ctx, gtx, modifications, address, onePhaseCommit);
+ }
+
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ try
{
- if (isGravitationEnabled(ctx) &&
MethodDeclarations.isGetMethod(m.getMethodId()))
+ transactionMods.remove(ctx.getGlobalTransaction());
+ return nextInterceptor(ctx);
+ } catch (Throwable throwable)
+ {
+ transactionMods.remove(ctx.getGlobalTransaction());
+ throw throwable;
+ }
+ }
+
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ try
+ {
+ doCommit(ctx.getGlobalTransaction(), ctx);
+ transactionMods.remove(ctx.getGlobalTransaction());
+ return nextInterceptor(ctx);
+ } catch (Throwable throwable)
+ {
+ transactionMods.remove(ctx.getGlobalTransaction());
+ throw throwable;
+ }
+ }
+
+ private Object handleGetMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ if (isGravitationEnabled(ctx))
+ {
+ // test that the Fqn being requested exists locally in the cache.
+ if (log.isTraceEnabled()) log.trace("Checking local existence of fqn "
+ fqn);
+ if (BuddyManager.isBackupFqn(fqn))
{
- // test that the Fqn being requested exists locally in the cache.
- Fqn fqn = extractFqn(m.getMethodId(), m.getArgs());
- if (log.isTraceEnabled()) log.trace("Checking local existence of fqn
" + fqn);
- if (BuddyManager.isBackupFqn(fqn))
+ log.info("Is call for a backup Fqn, not performing any gravitation.
Direct calls on internal backup nodes are *not* supported.");
+ } else
+ {
+ if (cache.peek(fqn, false) == null)
{
- log.info("Is call for a backup Fqn, not performing any gravitation.
Direct calls on internal backup nodes are *not* supported.");
- }
- else
- {
- if (cache.peek(fqn, false) == null)
+ log.trace("Gravitating from local backup tree");
+ BackupData data = localBackupGet(fqn, ctx);
+
+ if (data == null)
{
- log.trace("Gravitating from local backup tree");
- BackupData data = localBackupGet(fqn, ctx);
+ log.trace("Gravitating from remote backup tree");
+ // gravitate remotely.
+ data = remoteBackupGet(fqn);
+ }
- if (data == null)
- {
- log.trace("Gravitating from remote backup tree");
- // gravitate remotely.
- data = remoteBackupGet(fqn);
- }
+ if (data != null)
+ {
+ // create node locally so I don't gravitate again
+ // when I do the put() call to the cluster!
+ //createNode(data.backupData, true);
+ // Make sure I replicate to my buddies.
+ log.trace("Passing the put call locally to make sure state is
persisted and ownership is correctly established.");
+ createNode(data.backupData, false);
- if (data != null)
- {
- // create node locally so I don't gravitate again
- // when I do the put() call to the cluster!
- //createNode(data.backupData, true);
- // Make sure I replicate to my buddies.
- log.trace("Passing the put call locally to make sure state is
persisted and ownership is correctly established.");
- createNode(data.backupData, false);
+ // very strange, the invocation contexts get twisted up here, and will
need preservation.
+ // a bit crappy and hacky, all will be solved when we move to JBoss AOP
in 2.1.0
+ //ctx.setMethodCall(m);
- // very strange, the invocation contexts get twisted up here, and
will need preservation.
- // a bit crappy and hacky, all will be solved when we move to JBoss
AOP in 2.1.0
- //ctx.setMethodCall(m);
-
- // Clean up the other nodes
- cleanBackupData(data, ctx.getGlobalTransaction(), ctx);
- }
+ // Clean up the other nodes
+ cleanBackupData(data, ctx.getGlobalTransaction(), ctx);
}
- else
- {
- log.trace("No need to gravitate; have this already.");
- }
- }
- }
- else
- {
- if (log.isTraceEnabled())
+ } else
{
- log.trace("Suppressing data gravitation for this call.");
+ log.trace("No need to gravitate; have this already.");
}
}
- }
- else
+ } else
{
-
- try
+ if (log.isTraceEnabled())
{
- switch (m.getMethodId())
- {
- case MethodDeclarations.prepareMethod_id:
- case MethodDeclarations.optimisticPrepareMethod_id:
- Object o = super.invoke(ctx);
- doPrepare(ctx.getGlobalTransaction(), ctx);
- return o;
- case MethodDeclarations.rollbackMethod_id:
- transactionMods.remove(ctx.getGlobalTransaction());
- return super.invoke(ctx);
- case MethodDeclarations.commitMethod_id:
- doCommit(ctx.getGlobalTransaction(), ctx);
- transactionMods.remove(ctx.getGlobalTransaction());
- return super.invoke(ctx);
- }
+ log.trace("Suppressing data gravitation for this call.");
}
- catch (Throwable throwable)
- {
- transactionMods.remove(ctx.getGlobalTransaction());
- throw throwable;
- }
}
- // }
- // }
- // else
- // {
- // if (log.isTraceEnabled())
- // log.trace("Suppressing data gravitation for this
call.");
- // }
- return super.invoke(ctx);
+ return nextInterceptor(ctx);
}
private boolean isGravitationEnabled(InvocationContext ctx)
@@ -415,26 +446,6 @@
return cache.getTransactionTable().get(gtx);
}
- private Fqn extractFqn(int methodId, Object[] args)
- {
- return (Fqn) args[MethodDeclarations.isCrudMethod(methodId) ? 1 : 0];
- }
-
- @SuppressWarnings("unchecked")
- private boolean localBackupExists(Fqn<?> fqn)
- {
- boolean exists = false;
-
- for (Node node : getBackupRootCollection())
- {
- Fqn<?> newSearchFqn = new Fqn(node.getFqn(), fqn);
- exists = cache.peek(newSearchFqn, false) != null;
- if (exists) break;
- }
-
- return exists;
- }
-
private BackupData localBackupGet(Fqn fqn, InvocationContext ctx) throws
CacheException
{
GravitateResult result = cache.gravitateData(fqn, true);// a "local"
gravitation
@@ -462,13 +473,6 @@
return data;
}
- @SuppressWarnings("unchecked")
- private Collection<Node> getBackupRootCollection()
- {
- NodeSPI backupRoot = cache.peek(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, true);
- return backupRoot == null ? Collections.emptySet() :
backupRoot.getChildrenDirect();
- }
-
private static class BackupData
{
Fqn primaryFqn;
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -12,10 +12,9 @@
import org.jboss.cache.*;
import org.jboss.cache.eviction.EvictedEventNode;
import org.jboss.cache.eviction.NodeEventType;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.transaction.GlobalTransaction;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -24,46 +23,15 @@
* This interceptor is used to handle eviction events.
*
* @author Daniel Huang
+ * @author Mircea.Markus(a)jboss.com
* @version $Revision$
*/
-public class EvictionInterceptor extends Interceptor
+public class EvictionInterceptor extends MethodDispacherInterceptor
{
private static final Log log = LogFactory.getLog(EvictionInterceptor.class);
protected RegionManager regionManager;
- protected Map<Integer, EvictionMethodHandler> evictionMethodHandlers = new
HashMap<Integer, EvictionMethodHandler>();
- public EvictionInterceptor()
- {
- EvictionMethodHandler handler = new GetNodeEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.getNodeMethodLocal_id, handler);
- evictionMethodHandlers.put(MethodDeclarations.getDataMapMethodLocal_id, handler);
-
- handler = new GetKeyEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.getKeyValueMethodLocal_id, handler);
-
- handler = new RemoveNodeEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.removeNodeMethodLocal_id, handler);
- evictionMethodHandlers.put(MethodDeclarations.removeDataMethodLocal_id, handler);
-
- handler = new RemoveKeyEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.removeKeyMethodLocal_id, handler);
-
- handler = new PutDataEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.putDataMethodLocal_id, handler);
-
- handler = new PutDataEraseEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.putDataEraseMethodLocal_id,
handler);
-
- handler = new PutKeyEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.putKeyValMethodLocal_id, handler);
- evictionMethodHandlers.put(MethodDeclarations.putForExternalReadMethodLocal_id,
handler);
-
- handler = new PartialEvictionEvictionMethodHandler();
- evictionMethodHandlers.put(MethodDeclarations.evictNodeMethodLocal_id, handler);
- evictionMethodHandlers.put(MethodDeclarations.evictVersionedNodeMethodLocal_id,
handler);
- }
-
/**
* this method is for ease of unit testing. thus package access.
* <p/>
@@ -80,302 +48,211 @@
this.regionManager = cache.getRegionManager();
}
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Log getLog()
{
- MethodCall m = ctx.getMethodCall();
- Object ret = super.invoke(ctx);
-
- if (log.isTraceEnabled())
- {
- log.trace("Invoking EvictionInterceptor");
- }
-
- // skip the TX. this interceptor will invoke around/after the call and lock. if the
ret == null or if an exception
- // is thrown, this interceptor is terminated. there is no need for explicit
rollback logic.
- this.updateNode(m, ret);
-
-
-
- if (log.isTraceEnabled())
- {
- log.trace("Finished invoking EvictionInterceptor");
- }
-
- return ret;
+ return log;
}
- protected void updateNode(MethodCall m, Object retVal)
+ protected Object handleEvictVersionedNodeMethod(InvocationContext ctx, Fqn fqn,
DataVersion dataVersion) throws Throwable
{
- if (log.isTraceEnabled())
- {
- log.trace("Updating node/element events with no tx");
- }
-
- EvictedEventNode event = this.extractEvent(m, retVal);
- if (event == null)
- {
- // no node modifications.
- return;
- }
-
- NodeSPI<?,?> nodeSPI = cache.peek(event.getFqn(), false);
- //we do not trigger eviction events for resident nodes
- if (nodeSPI != null && nodeSPI.isResident())
- {
- return;
- }
-
- this.doEventUpdatesOnRegionManager(event);
-
- if (log.isTraceEnabled())
- {
- log.trace("Finished updating node");
- }
+ return handleEvictMethod(ctx, fqn);
}
- protected EvictedEventNode extractEvent(MethodCall m, Object retVal)
+ protected Object handleEvictMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
- EvictionMethodHandler handler = this.evictionMethodHandlers.get(m.getMethodId());
- if (handler == null)
+ Object retVal = nextInterceptor(ctx);
+ // See if the node still exists; i.e. was only data removed
+ // because it still has children.
+ // If yes, put an ADD event in the queue so the node gets revisited
+ boolean complete = (retVal != null && (Boolean) retVal);
+ if (!complete)
{
- return null;
+ if (fqn != null && !canIgnoreEvent(fqn, NodeEventType.ADD_NODE_EVENT))
+ {
+ registerEvictionEventToRegionManager(new EvictedEventNode(fqn,
NodeEventType.ADD_NODE_EVENT, 0));
+ }
}
-
- return handler.extractEvictedEventNode(m, retVal);
+ return retVal;
}
- protected boolean canIgnoreEvent(Fqn fqn, NodeEventType type)
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
{
- Region r = regionManager.getRegion(fqn, Region.Type.EVICTION, false);
- if (r == null) return true;// should never happen, we should at least get the
default region.
- return r.getEvictionPolicy().canIgnoreEvent(fqn, type);
+ return handlePutForExternalReadMethod(ctx, gtx, fqn, key, value);
}
- protected void doEventUpdatesOnRegionManager(EvictedEventNode event)
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
{
- Region region = regionManager.getRegion(event.getFqn(), false);
- region.putNodeEvent(event);
-
- if (log.isTraceEnabled())
+ Object retVal = nextInterceptor(ctx);
+ if (fqn != null && key != null && !canIgnoreEvent(fqn,
NodeEventType.ADD_ELEMENT_EVENT))
{
- log.trace("Adding event " + event + " to region at " +
region.getFqn());
+ registerEvictionEventToRegionManager( new EvictedEventNode(fqn,
NodeEventType.ADD_ELEMENT_EVENT, 1));
}
+ return retVal;
}
- protected class GetNodeEvictionMethodHandler implements EvictionMethodHandler
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
{
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ Object retVal = nextInterceptor(ctx);
+ if (fqn != null && !canIgnoreEvent(fqn, NodeEventType.ADD_NODE_EVENT))
{
- if (retVal == null)
+ if (newData == null)
{
if (log.isTraceEnabled())
{
- log.trace("No event added. Node does not exist");
+ log.trace("Putting null data under fqn " + fqn +
".");
}
-
- return null;
}
-
- Object args[] = mc.getArgs();
- Fqn fqn = (Fqn) args[0];
-
- if (fqn != null && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.VISIT_NODE_EVENT))
+ else
{
- return new EvictedEventNode(fqn, NodeEventType.VISIT_NODE_EVENT);
+ int size;
+ synchronized (newData)
+ {
+ size = newData.size();
+ }
+ EvictedEventNode event = new EvictedEventNode(fqn,
NodeEventType.ADD_NODE_EVENT, size);
+ event.setResetElementCount(eraseContents);
+ registerEvictionEventToRegionManager(event);
}
-
- return null;
}
+ return retVal;
}
- protected class GetKeyEvictionMethodHandler implements EvictionMethodHandler
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
{
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ Object retVal = nextInterceptor(ctx);
+ if (fqn != null && !canIgnoreEvent(fqn, NodeEventType.ADD_NODE_EVENT))
{
- if (retVal == null)
+ if (data == null)
{
if (log.isTraceEnabled())
{
- log.trace("No event added. Element does not exist");
+ log.trace("Putting null data under fqn " + fqn +
".");
}
- return null;
}
-
- Object args[] = mc.getArgs();
- Fqn fqn = (Fqn) args[0];
- Object key = args[1];
- if (fqn != null && key != null
- && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.VISIT_NODE_EVENT))
+ else
{
- return new EvictedEventNode(fqn, NodeEventType.VISIT_NODE_EVENT);
+ int size;
+ synchronized (data)
+ {
+ size = data.size();
+ }
+ registerEvictionEventToRegionManager(new EvictedEventNode(fqn,
NodeEventType.ADD_NODE_EVENT, size));
}
-
- return null;
}
-
+ return retVal;
}
- protected class RemoveNodeEvictionMethodHandler implements EvictionMethodHandler
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
{
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ Object retVal = nextInterceptor(ctx);
+ if (retVal == null)
{
- Object args[] = mc.getArgs();
- Fqn fqn = (Fqn) args[1];
-
- if (fqn != null
- && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.REMOVE_NODE_EVENT))
+ if (log.isTraceEnabled())
{
- return new EvictedEventNode(fqn, NodeEventType.REMOVE_NODE_EVENT);
+ log.trace("No event added. Element does not exist");
}
- return null;
}
-
+ else
+ {
+ if (fqn != null && key != null && !canIgnoreEvent(fqn,
NodeEventType.REMOVE_ELEMENT_EVENT))
+ {
+ registerEvictionEventToRegionManager(new EvictedEventNode(fqn,
NodeEventType.REMOVE_ELEMENT_EVENT, 1));
+ }
+ }
+ return retVal;
}
- protected class RemoveKeyEvictionMethodHandler implements EvictionMethodHandler
+ protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ Object retVal = nextInterceptor(ctx);
+ if (retVal == null)
{
- if (retVal == null)
+ if (log.isTraceEnabled())
{
- if (log.isTraceEnabled())
- {
- log.trace("No event added. Element does not exist");
- }
-
- return null;
+ log.trace("No event added. Node does not exist");
}
-
- Object args[] = mc.getArgs();
- Fqn fqn = (Fqn) args[1];
- Object key = args[2];
- if (fqn != null && key != null
- && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.REMOVE_ELEMENT_EVENT))
+ } else
+ {
+ if (fqn != null && !canIgnoreEvent(fqn,
NodeEventType.VISIT_NODE_EVENT))
{
- return new EvictedEventNode(fqn, NodeEventType.REMOVE_ELEMENT_EVENT, 1);
+ registerEvictionEventToRegionManager(new EvictedEventNode(fqn,
NodeEventType.VISIT_NODE_EVENT));
}
- return null;
}
+ return retVal;
+ }
+ protected Object handleGetDataMapMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return handleGetNodeMethod(ctx, fqn);
}
- protected class PutDataEvictionMethodHandler implements EvictionMethodHandler
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
{
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ Object retVal = nextInterceptor(ctx);
+ if (retVal == null)
{
- Object[] args = mc.getArgs();
- Fqn fqn = (Fqn) args[1];
- Map putData = (Map) args[2];
- if (fqn != null
- && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.ADD_NODE_EVENT))
+ if (log.isTraceEnabled())
{
- if (putData == null)
- {
- if (log.isTraceEnabled())
- {
- log.trace("Putting null data under fqn " + fqn +
".");
- }
-
- return null;
- }
-
- int size;
- synchronized (putData)
- {
- size = putData.size();
- }
-
- return new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT, size);
+ log.trace("No event added. Element does not exist");
}
-
- return null;
}
-
+ else if (fqn != null && key != null && !canIgnoreEvent(fqn,
NodeEventType.VISIT_NODE_EVENT))
+ {
+ registerEvictionEventToRegionManager(new EvictedEventNode(fqn,
NodeEventType.VISIT_NODE_EVENT));
+ }
+ return retVal;
}
- protected class PutDataEraseEvictionMethodHandler implements EvictionMethodHandler
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
{
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ Object retVal = nextInterceptor(ctx);
+ if (fqn != null && !canIgnoreEvent(fqn, NodeEventType.REMOVE_NODE_EVENT))
{
- Object[] args = mc.getArgs();
- Fqn fqn = (Fqn) args[1];
- Map putData = (Map) args[2];
- Boolean resetElementCount = (Boolean) args[4];
- if (fqn != null
- && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.ADD_NODE_EVENT))
- {
- if (putData == null)
- {
- if (log.isTraceEnabled())
- {
- log.trace("Putting null data under fqn " + fqn +
".");
- }
-
- return null;
- }
-
- int size;
- synchronized (putData)
- {
- size = putData.size();
- }
-
- EvictedEventNode event = new EvictedEventNode(fqn,
NodeEventType.ADD_NODE_EVENT, size);
- event.setResetElementCount(resetElementCount);
- return event;
- }
-
- return null;
+ registerEvictionEventToRegionManager(new EvictedEventNode(fqn,
NodeEventType.REMOVE_NODE_EVENT));
}
+ return retVal;
}
- protected class PutKeyEvictionMethodHandler implements EvictionMethodHandler
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
{
+ return handleRemoveNodeMethod(ctx, tx, fqn, createUndoOps);
+ }
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ private void registerEvictionEventToRegionManager(EvictedEventNode event)
+ {
+ if (event == null)
{
- Object[] args = mc.getArgs();
- Fqn fqn = (Fqn) args[1];
- Object key = args[2];
- if (fqn != null && key != null
- && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.ADD_ELEMENT_EVENT))
- {
- return new EvictedEventNode(fqn, NodeEventType.ADD_ELEMENT_EVENT, 1);
- }
+ // no node modifications.
+ return;
+ }
- return null;
+ NodeSPI<?,?> nodeSPI = cache.peek(event.getFqn(), false);
+ //we do not trigger eviction events for resident nodes
+ if (nodeSPI != null && nodeSPI.isResident())
+ {
+ return;
}
- }
+ Region region = regionManager.getRegion(event.getFqn(), false);
+ region.putNodeEvent(event);
- protected class PartialEvictionEvictionMethodHandler implements EvictionMethodHandler
- {
- public EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal)
+ if (log.isTraceEnabled())
{
- // See if the node still exists; i.e. was only data removed
- // because it still has children.
- // If yes, put an ADD event in the queue so the node gets revisited
+ log.trace("Adding event " + event + " to region at " +
region.getFqn());
+ }
- boolean complete = (retVal != null && (Boolean) retVal);
- if (!complete)
- {
- Object[] args = mc.getArgs();
- Fqn fqn = (Fqn) args[0];
- if (fqn != null
- && !EvictionInterceptor.this.canIgnoreEvent(fqn,
NodeEventType.ADD_NODE_EVENT))
- {
- return new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT, 0);
- }
- }
- return null;
+ if (log.isTraceEnabled())
+ {
+ log.trace("Finished updating node");
}
}
- protected interface EvictionMethodHandler
+ protected boolean canIgnoreEvent(Fqn fqn, NodeEventType type)
{
- EvictedEventNode extractEvictedEventNode(MethodCall mc, Object retVal);
+ Region r = regionManager.getRegion(fqn, Region.Type.EVICTION, false);
+ // should never happen, we should at least get the default region.
+ return r == null || r.getEvictionPolicy().canIgnoreEvent(fqn, type);
}
-
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -27,8 +27,6 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
import javax.transaction.Status;
import javax.transaction.SystemException;
@@ -38,7 +36,7 @@
/**
* Class representing an interceptor.
- * <em>Note that this will be replaced by {@link org.jboss.aop.advice.Interceptor}
in one of the next releases</em>
+ * <em>Note that this will be replaced by {@link org.jboss.aop.advice.Interceptor}
in one of the nextInterceptor releases</em>
*
* @author Bela Ban
* @version $Id$
@@ -72,11 +70,24 @@
this.configuration = cache.getConfiguration();
}
+ /**
+ * Using this method call for forwarding a call in the chain is not redable and error
prone in the case of interceptors
+ * extending other interceptors. This metod rather refers to interceptor doing its
business operations rather than
+ * delegating to the nextInterceptor interceptor in chain. For delegation please use
{@link #nextInterceptor(org.jboss.cache.InvocationContext)}
+ */
public Object invoke(InvocationContext ctx) throws Throwable
{
return next.invoke(ctx);
}
+ /**
+ * Forwards the call to the nextInterceptor interceptor in the chain.
+ */
+ public Object nextInterceptor(InvocationContext ctx) throws Throwable
+ {
+ return next.invoke(ctx);
+ }
+
public boolean getStatisticsEnabled()
{
return statsEnabled;
@@ -163,25 +174,7 @@
return isActive(tx) || isPreparing(tx);
}
- /**
- * This only works for prepare() and optimisticPrepare() method calls.
- *
- * @param m
- */
- protected boolean isOnePhaseCommitPrepareMehod(MethodCall m)
- {
- switch (m.getMethodId())
- {
- case MethodDeclarations.prepareMethod_id:
- return (Boolean) m.getArgs()[3];
- case MethodDeclarations.optimisticPrepareMethod_id:
- return (Boolean) m.getArgs()[4];
- default:
- return false;
- }
- }
-
- /**
+ /**
* Tests whether the caller is in a valid transaction. If not, will throw a
CacheException.
*/
protected void assertTransactionValid(InvocationContext ctx)
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -21,6 +21,8 @@
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.apache.commons.logging.Log;
+import org.jgroups.Address;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
@@ -59,110 +61,189 @@
if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction,
List<MethodCall>>();
}
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Log getLog()
{
- MethodCall m = ctx.getMethodCall();
- Option optionOverride = ctx.getOptionOverrides();
- if (optionOverride != null && optionOverride.isCacheModeLocal() &&
(ctx.getTransaction() == null ||
MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId())))
+ return log;
+ }
+
+ protected boolean skipMethodCall(InvocationContext ctx)
+ {
+ Option optionOverride = ctx.getOptionOverrides();
+ if (optionOverride != null && optionOverride.isCacheModeLocal() &&
(ctx.getTransaction() == null ||
+
MethodDeclarations.isTransactionLifecycleMethod(ctx.getMethodCall().getMethodId())))
{
// skip replication!!
- return super.invoke(ctx);
+ return true;
}
+ if (log.isTraceEnabled()) log.trace("(" + cache.getLocalAddress() +
") method call " + ctx.getMethodCall());
+ return false;
+ }
- Transaction tx = ctx.getTransaction();
- Object retval = super.invoke(ctx);
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
- if (log.isTraceEnabled()) log.trace("(" + cache.getLocalAddress() +
") method call " + m);
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
- // now see if this is a CRUD method:
- if (MethodDeclarations.isCrudMethod(m.getMethodId()))
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handlePutDataEraseVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Map data, boolean createUndoOps, boolean eraseContent,
DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handlePutDataVersionedMethod(InvocationContext ctx, GlobalTransaction
globalTransaction, Fqn fqn, Map map, Boolean createUndoOps, DataVersion dataVersion)
throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handlePutKeyValueVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, boolean createUndoOps,
DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx,fqn, null);
+ }
+
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handleDataGravitationCleanupMethod(InvocationContext ctx,
GlobalTransaction globalTransaction, Fqn primary, Fqn backup) throws Throwable
+ {
+ return handleCrudMethod(ctx, primary, null);
+ }
+
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
+ return handleCrudMethod(ctx, to, from);
+ }
+
+ protected Object handleRemoveKeyVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, boolean createUndoOps, DataVersion dv) throws
Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handleRemoveNodeVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx, fqn, null);
+ }
+
+ protected Object handleRemoveDataVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx,fqn, null);
+ }
+
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ Transaction tx = ctx.getTransaction();
+ if (tx != null && !optimistic)
{
- if (m.getMethodId() != MethodDeclarations.putForExternalReadMethodLocal_id
&& m.getMethodId() !=
MethodDeclarations.putForExternalReadVersionedMethodLocal_id)
+ log.debug("Entering InvalidationInterceptor's prepare phase");
+ // fetch the modifications before the transaction is committed (and thus removed
from the txTable)
+ gtx = ctx.getGlobalTransaction();
+ TransactionEntry entry = txTable.get(gtx);
+ if (entry == null) throw new IllegalStateException("cannot find transaction
entry for " + gtx);
+ List<MethodCall> modifications = new
LinkedList<MethodCall>(entry.getModifications());
+ if (modifications.size() > 0)
{
- if (log.isDebugEnabled()) log.debug("Is a CRUD method");
- Set<Fqn> fqns = new HashSet<Fqn>();
- findAndAddFqns(m.getArgs(), fqns, m.getMethodId() ==
MethodDeclarations.moveMethodLocal_id);
- if (!fqns.isEmpty())
- {
- // could be potentially TRANSACTIONAL. Ignore if it is, until we see a
prepare().
- if (tx == null || !isValid(tx))
- {
- // the no-tx case:
- //replicate an evict call.
- for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null,
isSynchronous(optionOverride), ctx);
- }
- }
+ broadcastInvalidate(modifications, gtx, tx, ctx);
}
else
{
- log.debug("Encountered a putForExternalRead() - is a no op.");
+ log.debug("Nothing to invalidate - no modifications in the
transaction.");
}
}
- else
+ return retval;
+ }
+
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ Transaction tx = ctx.getTransaction();
+ if (tx != null)
{
- // not a CRUD method - lets see if it is a tx lifecycle method.
- if (tx != null)
+ // here we just record the modifications but actually do the invalidate in
commit.
+ gtx = ctx.getGlobalTransaction();
+ TransactionEntry entry = txTable.get(gtx);
+ if (entry == null) throw new IllegalStateException("cannot find transaction
entry for " + gtx);
+ modifications = new LinkedList<MethodCall>(entry.getModifications());
+ if (modifications.size() > 0)
{
+ txMods.put(gtx, modifications);
+ }
+ }
+ return retval;
+ }
- GlobalTransaction gtx;
- TransactionEntry entry;
- List<MethodCall> modifications;
- // lets see if we are in the prepare phase (as this is the only time we
actually do anything)
- switch (m.getMethodId())
- {
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ Transaction tx = ctx.getTransaction();
+ if (tx !=null && optimistic)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ List modifications = txMods.remove(gtx);
+ broadcastInvalidate(modifications, gtx, tx, ctx);
+ log.debug("Committing. Broadcasting invalidations.");
+ }
+ return retval;
+ }
- case MethodDeclarations.prepareMethod_id:
- if (!optimistic)
- {
- log.debug("Entering InvalidationInterceptor's prepare
phase");
- // fetch the modifications before the transaction is committed (and
thus removed from the txTable)
- gtx = ctx.getGlobalTransaction();
- entry = txTable.get(gtx);
- if (entry == null) throw new IllegalStateException("cannot find
transaction entry for " + gtx);
- modifications = new
LinkedList<MethodCall>(entry.getModifications());
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ Transaction tx = ctx.getTransaction();
+ if (tx !=null && optimistic)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ txMods.remove(gtx);
+ log.debug("Caught a rollback. Clearing modification in txMods");
+ }
+ return retval;
+ }
- if (modifications.size() > 0)
- {
- broadcastInvalidate(modifications, gtx, tx, ctx);
- }
- else
- {
- log.debug("Nothing to invalidate - no modifications in the
transaction.");
- }
- }
- break;
- case MethodDeclarations.optimisticPrepareMethod_id:
- // here we just record the modifications but actually do the invalidate
in commit.
- gtx = ctx.getGlobalTransaction();
- entry = txTable.get(gtx);
- if (entry == null) throw new IllegalStateException("cannot find
transaction entry for " + gtx);
- modifications = new
LinkedList<MethodCall>(entry.getModifications());
-
- if (modifications.size() > 0)
- {
- txMods.put(gtx, modifications);
- }
- break;
- case MethodDeclarations.commitMethod_id:
- if (optimistic)
- {
- gtx = ctx.getGlobalTransaction();
- modifications = txMods.remove(gtx);
- broadcastInvalidate(modifications, gtx, tx, ctx);
- log.debug("Committing. Broadcasting invalidations.");
- }
- break;
- case MethodDeclarations.rollbackMethod_id:
- if (optimistic)
- {
- gtx = ctx.getGlobalTransaction();
- txMods.remove(gtx);
- log.debug("Caught a rollback. Clearing modification in
txMods");
- }
- break;
- }
+ /**
+ * @param from is only present for move operations, else pass it in as null
+ */
+ private Object handleCrudMethod(InvocationContext ctx, Fqn targetFqn, Fqn from)
+ throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ Transaction tx = ctx.getTransaction();
+ Option optionOverride = ctx.getOptionOverrides();
+ if (log.isDebugEnabled()) log.debug("Is a CRUD method");
+ Set<Fqn> fqns = new HashSet<Fqn>();
+ if (from != null)
+ {
+ fqns.add(from);
+ }
+ fqns.add(targetFqn);
+ if (!fqns.isEmpty())
+ {
+ // could be potentially TRANSACTIONAL. Ignore if it is, until we see a
prepare().
+ if (tx == null || !isValid(tx))
+ {
+ // the no-tx case:
+ //replicate an evict call.
+ for (Fqn fqn : fqns) invalidateAcrossCluster(fqn, null,
isSynchronous(optionOverride), ctx);
}
-
}
return retval;
}
@@ -293,6 +374,7 @@
}
@SuppressWarnings("unchecked")
+ @Deprecated
protected void findAndAddFqns(Object[] objects, Set<Fqn> fqns, boolean isMove)
{
if (isMove)
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -11,6 +11,7 @@
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.transaction.GlobalTransaction;
+import org.apache.commons.logging.Log;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
@@ -22,6 +23,11 @@
*/
public class InvocationContextInterceptor extends BaseTransactionalContextInterceptor
implements InvocationContextInterceptorMBean
{
+ protected Log getLog()
+ {
+ return log;
+ }
+
public Object invoke(InvocationContext ctx) throws Throwable
{
MethodCall call = ctx.getMethodCall();
@@ -59,7 +65,7 @@
}
}
- Object retval = super.invoke(ctx);
+ Object retval = nextInterceptor(ctx);
// assume we're the first interceptor in the chain. Handle the
exception-throwing.
if (retval instanceof Throwable)
{
Added:
core/trunk/src/main/java/org/jboss/cache/interceptors/MethodDispacherInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/MethodDispacherInterceptor.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/MethodDispacherInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -0,0 +1,540 @@
+package org.jboss.cache.interceptors;
+
+import org.apache.commons.logging.Log;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Node;
+import org.jboss.cache.lock.NodeLock;
+import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jgroups.Address;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * The purpose of this interceptor is to supply a nicer way of handling the interception
of desired methods:
+ * <pre>
+ * - one can regiter to be notified on an particular method call by extending
handle<i>CalledMethod</i> method.
+ * This would result in removal of switch statemenets in invoke
+ * - the parameters of the hendler methods are passes in strongly typed, rather than
as an array of Objects
+ * </pre>
+ * This interceptor acts as a switch that delegates method calls to handlers/methods.
+ *
+ * Implementation notes:
+ * Current implementation checks to see the methods that are overwritten and does only
perform calls to those methods.
+ * This is for avoiding the casts needed to build parameter list. If a non overwritten
method is invoked,
+ * then next interceptor will be called.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @version 2.2
+ * todo - gtx is contained in InvocationContext. Check wheter passing method
calls is or isn't redundant
+ * todo - check wheter is possible to group methods, e.g.
MethodDeclarations.putMethods, treansationableMethods?
+ * todo - Refactor stuff in pessimistic lock interceptor
+ */
+public abstract class MethodDispacherInterceptor extends Interceptor
+{
+ /**
+ * List of the method the extending interceptor pverwrites. It is only those methods
that will be called.
+ */
+ private Set<Integer> overwrittenMethods = new TreeSet<Integer>();
+
+ protected MethodDispacherInterceptor()
+ {
+ processOverwritternMethods();
+ }
+
+ /**
+ * Builds the list of methods that are overwiritten.
+ */
+ private void processOverwritternMethods()
+ {
+ checkIfOverwritten(MethodDeclarations.putDataEraseMethodLocal_id,
"handlePutDataEraseMethod",InvocationContext.class, GlobalTransaction.class,
Fqn.class, Map.class, boolean.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.putDataMethodLocal_id,
"handlePutDataMethod", InvocationContext.class, GlobalTransaction.class,
Fqn.class, Map.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.putForExternalReadMethodLocal_id,
"handlePutForExternalReadMethod",InvocationContext.class,
GlobalTransaction.class, Fqn.class, Object.class, Object.class);
+ checkIfOverwritten(MethodDeclarations.putKeyValMethodLocal_id,
"handlePutKeyValueMethod", InvocationContext.class, GlobalTransaction.class,
Fqn.class, Object.class, Object.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.moveMethodLocal_id,
"handleMoveMethod",InvocationContext.class, Fqn.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.addChildMethodLocal_id,
"handleAddChildMethod",InvocationContext.class, GlobalTransaction.class,
Fqn.class, Object.class, Node.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.getKeyValueMethodLocal_id,
"handleGetKeyValueMethod", InvocationContext.class, Fqn.class, Object.class,
boolean.class);
+ checkIfOverwritten(MethodDeclarations.getNodeMethodLocal_id,
"handleGetNodeMethod", InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.getChildrenNamesMethodLocal_id,
"handleGetChildrenNamesMethod",InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.releaseAllLocksMethodLocal_id,
"handleReleaseAllLocksMethod",InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.printMethodLocal_id,
"handlePrintMethod",InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.getKeysMethodLocal_id,
"handleGetKeysMethod", InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.getDataMapMethodLocal_id,
"handleGetDataMapMethod", InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.rollbackMethod_id,
"handleRollbackMethod", InvocationContext.class, GlobalTransaction.class);
+ checkIfOverwritten(MethodDeclarations.removeNodeMethodLocal_id,
"handleRemoveNodeMethod", InvocationContext.class, GlobalTransaction.class,
Fqn.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.removeKeyMethodLocal_id,
"handleRemoveKeyMethod",InvocationContext.class, GlobalTransaction.class,
Fqn.class, Object.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.removeDataMethodLocal_id,
"handleRemoveDataMethod", InvocationContext.class, GlobalTransaction.class,
Fqn.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.commitMethod_id,
"handleCommitMethod",InvocationContext.class, GlobalTransaction.class);
+ checkIfOverwritten(MethodDeclarations.optimisticPrepareMethod_id,
"handleOptimisticPrepareMethod", InvocationContext.class,
GlobalTransaction.class, List.class, Map.class, Address.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.prepareMethod_id,
"handlePrepareMethod", InvocationContext.class, GlobalTransaction.class,
List.class, Address.class, boolean.class);
+ checkIfOverwritten(MethodDeclarations.evictNodeMethodLocal_id,
"handleEvictMethod", InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.evictVersionedNodeMethodLocal_id,
"handleEvictVersionedNodeMethod", InvocationContext.class, Fqn.class,
DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.existsMethod_id,
"handleExistsMethod", InvocationContext.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.putDataEraseVersionedMethodLocal_id,
"handlePutDataEraseVersionedMethod",InvocationContext.class,
GlobalTransaction.class, Fqn.class, Map.class, boolean.class, boolean.class,
DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.putDataVersionedMethodLocal_id,
"handlePutDataVersionedMethod", InvocationContext.class,
GlobalTransaction.class, Fqn.class, Map.class, Boolean.class, DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.putKeyValVersionedMethodLocal_id,
"handlePutKeyValueVersionedMethod", InvocationContext.class,
GlobalTransaction.class, Fqn.class, Object.class, Object.class, boolean.class,
DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.putForExternalReadVersionedMethodLocal_id,
"handlePutForExternalReadVersionedMethod", InvocationContext.class,
GlobalTransaction.class, Fqn.class, Object.class, Object.class, DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.dataGravitationCleanupMethod_id,
"handleDataGravitationCleanupMethod", InvocationContext.class,
GlobalTransaction.class, Fqn.class, Fqn.class);
+ checkIfOverwritten(MethodDeclarations.removeNodeVersionedMethodLocal_id,
"handleRemoveNodeVersionedMethod", InvocationContext.class,
GlobalTransaction.class, Fqn.class, boolean.class, DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.removeKeyVersionedMethodLocal_id,
"handleRemoveKeyVersionedMethod",InvocationContext.class,
GlobalTransaction.class, Fqn.class, Object.class, boolean.class, DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.removeDataVersionedMethodLocal_id,
"handleRemoveDataVersionedMethod", InvocationContext.class,
GlobalTransaction.class, Fqn.class, boolean.class, DataVersion.class);
+ checkIfOverwritten(MethodDeclarations.blockChannelMethodLocal_id,
"handleBlockChannelMethod",InvocationContext.class);
+ checkIfOverwritten(MethodDeclarations.unblockChannelMethodLocal_id,
"handleUnblockChannelMethod", InvocationContext.class);
+ checkIfOverwritten(MethodDeclarations.lockMethodLocal_id,
"handleLockMethod", InvocationContext.class, Fqn.class, NodeLock.LockType.class,
boolean.class);
+
+ }
+
+ private void checkIfOverwritten(int putDataEraseMethodLocal_id, String methodName,
Class... args)
+ {
+ Class currentClass = getClass();
+ //if this is a > 1 inheritace deepth and the method was overwritten in the
parent. We also have to look into parents
+ while (currentClass != MethodDispacherInterceptor.class)
+ {
+ try
+ {
+ currentClass.getDeclaredMethod(methodName, args);
+ this.overwrittenMethods.add(putDataEraseMethodLocal_id);
+ } catch (NoSuchMethodException e)
+ {
+ //ignore
+ }
+ currentClass = (Class) currentClass.getGenericSuperclass();
+ }
+ }
+
+ /**
+ * Acts like a 'switch case' that delegates the call to the appropriate
method.
+ */
+ public Object invoke(InvocationContext ctx) throws Throwable
+ {
+ if (getLog() != null && getLog().isTraceEnabled())
+ {
+ log.trace("Invoked with method call " + ctx.getMethodCall());
+ }
+ if (skipMethodCall(ctx))
+ {
+ return nextInterceptor(ctx);
+ }
+ MethodCall m = ctx.getMethodCall();
+ if (!overwrittenMethods.contains(m.getMethodId()))
+ {
+ return nextInterceptor(ctx);
+ }
+ Object[] args = m.getArgs();
+ Object result;
+ switch (m.getMethodId())
+ {
+ case MethodDeclarations.putDataEraseMethodLocal_id:
+ result = handlePutDataEraseMethod(ctx, (GlobalTransaction) args[0], (Fqn)
args[1], (Map) args[2], (Boolean) args[3], (Boolean) args[4]);
+ break;
+ case MethodDeclarations.putDataMethodLocal_id:
+ result = handlePutDataMethod(ctx, (GlobalTransaction) args[0], (Fqn) args[1],
(Map) args[2], (Boolean) args[3]);
+ break;
+ case MethodDeclarations.putForExternalReadMethodLocal_id:
+ result = handlePutForExternalReadMethod(ctx, (GlobalTransaction) args[0],
(Fqn) args[1], args[2], args[3]);
+ break;
+ case MethodDeclarations.putKeyValMethodLocal_id:
+ result = handlePutKeyValueMethod(ctx, (GlobalTransaction) args[0], (Fqn)
args[1], args[2], args[3], (Boolean) args[4]);
+ break;
+ case MethodDeclarations.moveMethodLocal_id:
+ result = handleMoveMethod(ctx, (Fqn) args[0], (Fqn) args[1]);
+ break;
+ case MethodDeclarations.addChildMethodLocal_id:
+ result = handleAddChildMethod(ctx, (GlobalTransaction) args[0], (Fqn)
args[1], args[2], (Node) args[3], (Boolean) args[4]);
+ break;
+ case MethodDeclarations.getKeyValueMethodLocal_id:
+ result = handleGetKeyValueMethod(ctx, (Fqn) args[0], args[1], (Boolean)
args[2]);
+ break;
+ case MethodDeclarations.getNodeMethodLocal_id:
+ result = handleGetNodeMethod(ctx, (Fqn) args[0]);
+ break;
+ case MethodDeclarations.getChildrenNamesMethodLocal_id:
+ result = handleGetChildrenNamesMethod(ctx, (Fqn) args[0]);
+ break;
+ case MethodDeclarations.releaseAllLocksMethodLocal_id:
+ result = handleReleaseAllLocksMethod(ctx, (Fqn) args[0]);
+ break;
+ case MethodDeclarations.printMethodLocal_id:
+ result = handlePrintMethod(ctx, (Fqn) args[0]);
+ break;
+ case MethodDeclarations.getKeysMethodLocal_id:
+ result = handleGetKeysMethod(ctx, (Fqn) args[0]);
+ break;
+ case MethodDeclarations.getDataMapMethodLocal_id:
+ result = handleGetDataMapMethod(ctx, (Fqn) args[0]);
+ break;
+ case MethodDeclarations.rollbackMethod_id:
+ result = handleRollbackMethod(ctx, (GlobalTransaction) args[0]);
+ break;
+ case MethodDeclarations.removeNodeMethodLocal_id:
+ result = handleRemoveNodeMethod(ctx, (GlobalTransaction) args[0], (Fqn)
args[1], (Boolean) args[2]);
+ break;
+ case MethodDeclarations.removeKeyMethodLocal_id:
+ result = handleRemoveKeyMethod(ctx, (GlobalTransaction) args[0], (Fqn)
args[1], args[2], (Boolean) args[3]);
+ break;
+ case MethodDeclarations.removeDataMethodLocal_id:
+ result = handleRemoveDataMethod(ctx, (GlobalTransaction) args[0], (Fqn)
args[1], (Boolean) args[2]);
+ break;
+ case MethodDeclarations.commitMethod_id:
+ result = handleCommitMethod(ctx, (GlobalTransaction) args[0]);
+ break;
+ case MethodDeclarations.optimisticPrepareMethod_id:
+ result = handleOptimisticPrepareMethod(ctx, (GlobalTransaction) args[0],
(List)args[1],(Map)args[2], (Address)args[3], (Boolean)args[4]);
+ break;
+ case MethodDeclarations.prepareMethod_id:
+ result = handlePrepareMethod(ctx, (GlobalTransaction) args[0], (List)args[1],
(Address)args[2], (Boolean)args[3]);
+ break;
+ case MethodDeclarations.evictNodeMethodLocal_id:
+ result = handleEvictMethod(ctx, (Fqn)args[0]);
+ break;
+ case MethodDeclarations.evictVersionedNodeMethodLocal_id:
+ result = handleEvictVersionedNodeMethod(ctx, (Fqn)args[0],
(DataVersion)args[1]);
+ break;
+ case MethodDeclarations.existsMethod_id:
+ result = handleExistsMethod(ctx, (Fqn)args[0]);
+ break;
+ case MethodDeclarations.putDataEraseVersionedMethodLocal_id:
+ result = handlePutDataEraseVersionedMethod(ctx, (GlobalTransaction) args[0],
(Fqn)args[1], (Map)args[2], (Boolean)args[3], (Boolean)args[4], (DataVersion)args[5]);
+ break;
+ case MethodDeclarations.putDataVersionedMethodLocal_id:
+ result = handlePutDataVersionedMethod(ctx, (GlobalTransaction) args[0],
(Fqn)args[1], (Map)args[2], (Boolean)args[3], (DataVersion)args[4]);
+ break;
+ case MethodDeclarations.putKeyValVersionedMethodLocal_id:
+ result = handlePutKeyValueVersionedMethod(ctx, (GlobalTransaction) args[0],
(Fqn)args[1], args[2], args[3], (Boolean)args[4], (DataVersion)args[5]);
+ break;
+ case MethodDeclarations.putForExternalReadVersionedMethodLocal_id:
+ result = handlePutForExternalReadVersionedMethod(ctx, (GlobalTransaction)
args[0], (Fqn)args[1], args[2], args[3], (DataVersion)args[5]);
+ break;
+ case MethodDeclarations.dataGravitationCleanupMethod_id:
+ result = handleDataGravitationCleanupMethod(ctx, (GlobalTransaction) args[0],
(Fqn)args[1], (Fqn)args[2]);
+ break;
+ case MethodDeclarations.removeNodeVersionedMethodLocal_id:
+ result = handleRemoveNodeVersionedMethod(ctx, (GlobalTransaction) args[0],
(Fqn)args[1], (Boolean)args[2], (DataVersion)args[3]);
+ break;
+ case MethodDeclarations.removeKeyVersionedMethodLocal_id:
+ result = handleRemoveKeyVersionedMethod(ctx, (GlobalTransaction) args[0],
(Fqn)args[1], args[2], (Boolean)args[3], (DataVersion)args[4]);
+ break;
+ case MethodDeclarations.removeDataVersionedMethodLocal_id:
+ result = handleRemoveDataVersionedMethod(ctx, (GlobalTransaction) args[0],
(Fqn)args[1], (Boolean)args[2], (DataVersion)args[3]);
+ break;
+ case MethodDeclarations.blockChannelMethodLocal_id:
+ result = handleBlockChannelMethod(ctx);
+ break;
+ case MethodDeclarations.unblockChannelMethodLocal_id:
+ result = handleUnblockChannelMethod(ctx);
+ break;
+ case MethodDeclarations.lockMethodLocal_id:
+ result = handleLockMethod(ctx, (Fqn)args[0], (NodeLock.LockType)args[1],
(Boolean)args[2]);
+ break;
+ default:
+ return nextInterceptor(ctx);
+ }
+ return result;
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_lock(org.jboss.cache.Fqn,
org.jboss.cache.lock.NodeLock.LockType, boolean)}
+ */
+ protected Object handleLockMethod(InvocationContext ctx, Fqn fqn, NodeLock.LockType
lockType, boolean recursive) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_unblock()}
+ */
+ protected Object handleUnblockChannelMethod(InvocationContext ctx) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_block()}
+ */
+ protected Object handleBlockChannelMethod(InvocationContext ctx) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_removeData(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, boolean, org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handleRemoveDataVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_remove(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, Object, boolean, org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handleRemoveKeyVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, boolean createUndoOps, DataVersion dv) throws
Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_remove(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, boolean, org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handleRemoveNodeVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_dataGravitationCleanup(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, org.jboss.cache.Fqn)}
+ */
+ protected Object handleDataGravitationCleanupMethod(InvocationContext ctx,
GlobalTransaction globalTransaction, Fqn primary, Fqn backup) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_putForExternalRead(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, Object, Object, org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handlePutForExternalReadVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, DataVersion dv) throws
Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_put(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, Object, Object, boolean, org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handlePutKeyValueVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, boolean createUndoOps,
DataVersion dv) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_put(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, java.util.Map, boolean, org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handlePutDataVersionedMethod(InvocationContext ctx, GlobalTransaction
globalTransaction, Fqn fqn, Map map, Boolean createUndoOps, DataVersion dataVersion)
throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_put(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, java.util.Map, boolean, boolean,
org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handlePutDataEraseVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Map data, boolean createUndoOps, boolean eraseContent,
DataVersion dv) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#exists(String)}
+ */
+ protected Object handleExistsMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * used for logging various steps. if null is returned than nothing is logged.
+ */
+ protected abstract Log getLog();
+
+ /**
+ * Each interceptor should extend this if it does not need any processing for current
call.
+ * An sample usage would be: this interceptor is only interested if thre is one
transaction going on. If so all
+ * handleXYZ would know that we have a transaction going and would not check its
state.
+ */
+ protected boolean skipMethodCall(InvocationContext ctx)
+ {
+ return false;
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_evict(org.jboss.cache.Fqn,
org.jboss.cache.optimistic.DataVersion)}
+ */
+ protected Object handleEvictVersionedNodeMethod(InvocationContext ctx, Fqn fqn,
DataVersion dataVersion) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#evict(org.jboss.cache.Fqn)}
+ */
+ protected Object handleEvictMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#prepare(org.jboss.cache.transaction.GlobalTransaction,
java.util.List, org.jgroups.Address, boolean)}
+ */
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#optimisticPrepare(org.jboss.cache.transaction.GlobalTransaction,
java.util.List, java.util.Map, org.jgroups.Address, boolean)}
+ */
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#commit(org.jboss.cache.transaction.GlobalTransaction)}
+ */
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_removeData(org.jboss.cache.transaction.GlobalTransaction, Fqn,
boolean)}
+ */
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_remove(org.jboss.cache.transaction.GlobalTransaction, String,
Object, boolean)}
+ */
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_remove(org.jboss.cache.transaction.GlobalTransaction, String,
boolean)}
+ */
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#rollback(org.jboss.cache.transaction.GlobalTransaction)}
+ */
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_getData(org.jboss.cache.Fqn)}
+ */
+ protected Object handleGetDataMapMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#getKeys(Fqn)}
+ */
+ protected Object handleGetKeysMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_print(org.jboss.cache.Fqn)}
+ */
+ protected Object handlePrintMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_releaseAllLocks(org.jboss.cache.Fqn)}
+ */
+ protected Object handleReleaseAllLocksMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_getChildrenNames(org.jboss.cache.Fqn)}
+ */
+ protected Object handleGetChildrenNamesMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_get(org.jboss.cache.Fqn)}
+ */
+ protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_get(org.jboss.cache.Fqn, Object,
boolean)}
+ */
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_addChild(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, Object, org.jboss.cache.Node, boolean)}
+ */
+ protected Object handleAddChildMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
parentFqn, Object childName, Node cn, boolean createUndoOps) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link org.jboss.cache.CacheImpl#_move(org.jboss.cache.Fqn,
org.jboss.cache.Fqn)}
+ */
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_put(org.jboss.cache.transaction.GlobalTransaction, String,
Object, Object, boolean)}
+ */
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_putForExternalRead(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, Object, Object)}
+ */
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_put(org.jboss.cache.transaction.GlobalTransaction, String,
java.util.Map, boolean)}
+ */
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handles {@link
org.jboss.cache.CacheImpl#_put(org.jboss.cache.transaction.GlobalTransaction,
org.jboss.cache.Fqn, java.util.Map, boolean, boolean)}
+ */
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ return defaultHandlersBehavior();
+ }
+
+ /**
+ * Handlers defined here should not be called directlly. There are two scenarios in
which a handler might be called:
+ * 1 - DerivedInterceptor.super - pointless call
+ * 2 - if the logic that determines that an handler is overwritten fails. Throwing an
exception by default is for
+ * guarding against this scenario
+ */
+ private Object defaultHandlersBehavior()
+ {
+ throw new IllegalStateException("this is either called from a derived class or
nt overwritten and accidentally called. Either way, is not correct.");
+ }
+}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/NotificationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/NotificationInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/NotificationInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -1,8 +1,10 @@
package org.jboss.cache.interceptors;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.apache.commons.logging.Log;
/**
* The interceptor in charge of firing off notifications to cache listeners
@@ -12,47 +14,40 @@
*/
public class NotificationInterceptor extends BaseTransactionalContextInterceptor
{
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Log getLog()
{
- MethodCall call = ctx.getMethodCall();
+ return null;
+ }
- switch (call.getMethodId())
- {
- case MethodDeclarations.blockChannelMethodLocal_id:
- cache.getNotifier().notifyCacheBlocked(cache, true);
- break;
- case MethodDeclarations.unblockChannelMethodLocal_id:
- cache.getNotifier().notifyCacheUnblocked(cache, true);
- break;
- default:
- // do nothing
- }
+ protected Object handleBlockChannelMethod(InvocationContext ctx) throws Throwable
+ {
+ cache.getNotifier().notifyCacheBlocked(cache, true);
+ Object retVal = nextInterceptor(ctx);
+ cache.getNotifier().notifyCacheBlocked(cache, false);
+ return retVal;
+ }
- // should only kick in as a call returns.
- Object retval = super.invoke(ctx);
+ protected Object handleUnblockChannelMethod(InvocationContext ctx) throws Throwable
+ {
+ cache.getNotifier().notifyCacheUnblocked(cache, true);
+ Object retval = nextInterceptor(ctx);
+ cache.getNotifier().notifyCacheUnblocked(cache, false);
+ return retval;
+ }
- // only invoke on commit.
- switch (call.getMethodId())
- {
- case MethodDeclarations.commitMethod_id:
- // notify commit
- // ctx.is
- cache.getNotifier().notifyTransactionCompleted(ctx.getTransaction(), true,
ctx);
- break;
- case MethodDeclarations.rollbackMethod_id:
- // notify rollback
- cache.getNotifier().notifyTransactionCompleted(ctx.getTransaction(), false,
ctx);
- break;
- case MethodDeclarations.blockChannelMethodLocal_id:
- cache.getNotifier().notifyCacheBlocked(cache, false);
- break;
- case MethodDeclarations.unblockChannelMethodLocal_id:
- cache.getNotifier().notifyCacheUnblocked(cache, false);
- break;
- default:
- // do nothing;
- }
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ // notify commit
+ // ctx.is
+ Object retval = nextInterceptor(ctx);
+ cache.getNotifier().notifyTransactionCompleted(ctx.getTransaction(), true, ctx);
+ return retval;
+ }
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ Object retval = nextInterceptor(ctx);
+ cache.getNotifier().notifyTransactionCompleted(ctx.getTransaction(), false, ctx);
return retval;
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -12,8 +12,6 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeFactory;
import org.jboss.cache.NodeSPI;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.optimistic.DataVersion;
import org.jboss.cache.optimistic.DefaultDataVersion;
@@ -23,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* Used to create new {@link NodeSPI} instances in the main data structure and then copy
it into the
@@ -48,36 +47,62 @@
nodeFactory = cache.getConfiguration().getRuntimeConfig().getNodeFactory();
}
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
- if (MethodDeclarations.isPutMethod(m.getMethodId()))
- {
- Object[] args = m.getArgs();
- Fqn fqn = (Fqn) (args != null ? args[1] : null);
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
+ }
- if (cache.peek(fqn, false) == null)
- {
- createNode(ctx, fqn, false);
- }
- }
- else if (m.getMethodId() == MethodDeclarations.moveMethodLocal_id)
- {
- Object[] args = m.getArgs();
- move(ctx, (Fqn) args[0], (Fqn) args[1]);
- }
+ protected Object handlePutDataEraseVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Map data, boolean createUndoOps, boolean eraseContent,
DataVersion dv) throws Throwable
+ {
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
+ }
- return super.invoke(ctx);
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
}
- private void move(InvocationContext ctx, Fqn nodeFqn, Fqn newParent)
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
{
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handlePutDataVersionedMethod(InvocationContext ctx, GlobalTransaction
globalTransaction, Fqn fqn, Map map, Boolean createUndoOps, DataVersion dataVersion)
throws Throwable
+ {
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handlePutKeyValueVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, boolean createUndoOps,
DataVersion dv) throws Throwable
+ {
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handlePutForExternalReadVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, DataVersion dv) throws
Throwable
+ {
+ createNode(ctx, fqn, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
List<Fqn> fqns = new ArrayList<Fqn>();
- fqns.add(newParent);
+ fqns.add((Fqn) to);
// peek into Node and get a hold of all child fqns as these need to be in the
workspace.
- NodeSPI node = cache.peek(nodeFqn, true, true);
- greedyGetFqns(fqns, node, newParent);
+ NodeSPI node = cache.peek((Fqn) from, true, true);
+ greedyGetFqns(fqns, node, (Fqn) to);
if (trace) log.trace("Adding Fqns " + fqns + " for a move()
operation.");
@@ -85,8 +110,9 @@
for (Fqn f : fqns)
{
- if (cache.peek(f, false) == null) createNode(ctx, f, true);
+ createNode(ctx, f, true);
}
+ return nextInterceptor(ctx);
}
/**
@@ -97,6 +123,7 @@
*/
private void createNode(InvocationContext ctx, Fqn targetFqn, boolean
suppressNotification) throws CacheException
{
+ if (cache.peek(targetFqn, false) != null) return;
// we do nothing if targetFqn is null
if (targetFqn == null) return;
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -15,6 +15,7 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.apache.commons.logging.Log;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -25,7 +26,7 @@
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
*/
-public class OptimisticInterceptor extends Interceptor
+public class OptimisticInterceptor extends MethodDispacherInterceptor
{
protected TransactionManager txManager = null;
protected TransactionTable txTable = null;
@@ -39,6 +40,11 @@
trace = log != null && log.isTraceEnabled();
}
+ protected Log getLog()
+ {
+ return null;
+ }
+
protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws
CacheException
{
OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry)
txTable.get(gtx);
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -6,18 +6,17 @@
*/
package org.jboss.cache.interceptors;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.NodeSPI;
+import org.jboss.cache.*;
import org.jboss.cache.lock.NodeLock;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
+import org.jgroups.Address;
+import java.util.List;
+import java.util.Map;
+
/**
* Locks nodes during transaction boundaries. Only affects prepare/commit/rollback
method calls; other method calls
* are simply passed up the interceptor stack.
@@ -35,105 +34,97 @@
lockAcquisitionTimeout = cache.getConfiguration().getLockAcquisitionTimeout();
}
- public Object invoke(InvocationContext ctx) throws Throwable
- {
- MethodCall m = ctx.getMethodCall();
- Object retval = null;
- //we are interested in the prepare/commit/rollback
- //this is irrespective of whether we are local or remote
- switch (m.getMethodId())
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ //try and acquire the locks - before passing on
+ gtx = getGlobalTransaction(ctx);
+ long timeout = lockAcquisitionTimeout;
+ if (ctx.getOptionOverrides() != null
+ && ctx.getOptionOverrides().getLockAcquisitionTimeout() >= 0)
{
- case MethodDeclarations.optimisticPrepareMethod_id:
- //try and acquire the locks - before passing on
- GlobalTransaction gtx = getGlobalTransaction(ctx);
- long timeout = lockAcquisitionTimeout;
- if (ctx.getOptionOverrides() != null
- && ctx.getOptionOverrides().getLockAcquisitionTimeout() >=
0)
- {
- timeout = ctx.getOptionOverrides().getLockAcquisitionTimeout();
- }
- try
- {
- lockNodes(gtx, timeout);
- }
- catch (Throwable e)
- {
- log.debug("Caught exception attempting to lock nodes ", e);
- //we have failed - set to rollback and throw exception
- try
- {
- unlock(gtx);
- }
- catch (Throwable t)
- {
- // we have failed to unlock - now what?
- log.error("Failed to unlock nodes, after failing to lock nodes
during a prepare! Locks are possibly in a very inconsistent state now!", t);
- }
- throw e;
- }
+ timeout = ctx.getOptionOverrides().getLockAcquisitionTimeout();
+ }
+ try
+ {
+ TransactionWorkspace<?, ?> workspace = getTransactionWorkspace(gtx);
+ if (log.isDebugEnabled()) log.debug("Locking nodes in transaction workspace
for GlobalTransaction " + gtx);
- // locks have acquired so lets pass on up
- retval = super.invoke(ctx);
- break;
- case MethodDeclarations.commitMethod_id:
- case MethodDeclarations.rollbackMethod_id:
- // we need to let the stack run its commits or rollbacks first -
- // we unlock last - even if an exception occurs
- try
+ for (WorkspaceNode workspaceNode : workspace.getNodes().values())
+ {
+ NodeSPI node = workspaceNode.getNode();
+ boolean acquired = node.getLock().acquire(gtx, lockAcquisitionTimeout,
NodeLock.LockType.WRITE);
+ if (acquired)
{
- retval = super.invoke(ctx);
+ if (trace) log.trace("Acquired lock on node " + node.getFqn());
+ cache.getTransactionTable().addLock(gtx, node.getLock());
}
- finally
+ else
{
- try
- {
- unlock(getGlobalTransaction(ctx));
- }
- catch (Exception e)
- {
- // we have failed to unlock - now what?
- log.error("Failed to unlock nodes after a commit or rollback!
Locks are possibly in a very inconsistent state now!", e);
- }
+ throw new CacheException("Unable to acquire lock on node " +
node.getFqn());
}
- break;
- case MethodDeclarations.lockMethodLocal_id:
- // bail out if _lock() is being called on the tree cache... this should never
be called with o/l enabled.
- throw new CacheException("_lock() passed up the interceptor stack when
Optimistic Locking is used. This is NOT supported.");
- default:
- //we do not care, just pass up the chain.
- retval = super.invoke(ctx);
- break;
+
+ }
}
+ catch (Throwable e)
+ {
+ log.debug("Caught exception attempting to lock nodes ", e);
+ //we have failed - set to rollback and throw exception
+ try
+ {
+ unlock(gtx);
+ }
+ catch (Throwable t)
+ {
+ // we have failed to unlock - now what?
+ log.error("Failed to unlock nodes, after failing to lock nodes during a
prepare! Locks are possibly in a very inconsistent state now!", t);
+ }
+ throw e;
+ }
- return retval;
+ // locks have acquired so lets pass on up
+ return nextInterceptor(ctx);
}
- /**
- * Locks all nodes held in the transaction workspace registered with the given global
transaction.
- *
- * @param gtx global transaction which contains a workspace
- */
- private void lockNodes(GlobalTransaction gtx, long timeout) throws
InterruptedException
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
{
- TransactionWorkspace<?, ?> workspace = getTransactionWorkspace(gtx);
- if (log.isDebugEnabled()) log.debug("Locking nodes in transaction workspace
for GlobalTransaction " + gtx);
+ return transactionFinalized(ctx);
+ }
- for (WorkspaceNode workspaceNode : workspace.getNodes().values())
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ return transactionFinalized(ctx);
+ }
+
+ protected Object handleLockMethod(InvocationContext ctx, Fqn fqn, NodeLock.LockType
lockType, boolean recursive)
+ {
+ // bail out if _lock() is being called on the tree cache... this should never be
called with o/l enabled.
+ throw new CacheException("_lock() passed up the interceptor stack when
Optimistic Locking is used. This is NOT supported.");
+ }
+
+ private Object transactionFinalized(InvocationContext ctx)
+ throws Throwable
+ {
+ Object retval = null;
+ // we need to let the stack run its commits or rollbacks first -
+ // we unlock last - even if an exception occurs
+ try
+ {
+ retval = nextInterceptor(ctx);
+ }
+ finally
{
- NodeSPI node = workspaceNode.getNode();
- boolean acquired = node.getLock().acquire(gtx, lockAcquisitionTimeout,
NodeLock.LockType.WRITE);
- if (acquired)
+ try
{
- if (trace) log.trace("Acquired lock on node " + node.getFqn());
- cache.getTransactionTable().addLock(gtx, node.getLock());
+ unlock(getGlobalTransaction(ctx));
}
- else
+ catch (Exception e)
{
- throw new CacheException("Unable to acquire lock on node " +
node.getFqn());
+ // we have failed to unlock - now what?
+ log.error("Failed to unlock nodes after a commit or rollback! Locks are
possibly in a very inconsistent state now!", e);
}
-
}
+ return retval;
}
/**
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -132,7 +132,7 @@
removeDataAndNotify(workspace, workspaceNode, ctx);
break;
case MethodDeclarations.dataGravitationCleanupMethod_id:
- result = super.invoke(ctx);
+ result = nextInterceptor(ctx);
default:
if (log.isWarnEnabled()) log.warn("Cannot handle CRUD method " +
m);
break;
@@ -161,7 +161,7 @@
break;
default:
if (trace) log.trace("read Method " + m + " called - Not
handling, passing on.");
- result = super.invoke(ctx);
+ result = nextInterceptor(ctx);
break;
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -21,10 +21,13 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
+import org.apache.commons.logging.Log;
+import org.jgroups.Address;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.Map;
/**
* Replication interceptor for the optimistically locked interceptor chain. Responsible
for replicating
@@ -44,102 +47,102 @@
// we really just need a set here, but concurrent CopyOnWriteArraySet has poor
performance when writing.
private final Set<GlobalTransaction> broadcastTxs = new
ConcurrentHashSet<GlobalTransaction>();
- public Object invoke(InvocationContext ctx) throws Throwable
+
+ protected Log getLog()
{
- MethodCall m = ctx.getMethodCall();
+ return log;
+ }
+
+ protected boolean skipMethodCall(InvocationContext ctx)
+ {
// bypass for buddy group org metod calls.
- if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return
super.invoke(ctx);
-
+ if
(MethodDeclarations.isBuddyGroupOrganisationMethod(ctx.getMethodCall().getMethodId()))
+ return true;
Option optionOverride = ctx.getOptionOverrides();
if (optionOverride != null && optionOverride.isCacheModeLocal() &&
ctx.getTransaction() == null)
{
// skip replication!!
log.debug("Skipping replication for this call as cache mode is local,
forced via an option override.");
- return super.invoke(ctx);
+ return true;
}
+ return false;
+ }
- Object retval;
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ // pass up the chain.
+ Object retval = nextInterceptor(ctx);
+ gtx = getGlobalTransaction(ctx);
- if (log.isTraceEnabled()) log.trace("Processing method " + m);
+ if (!gtx.isRemote() && ctx.isOriginLocal())
+ {
+ // replicate the prepare call.
+ broadcastPrepare(ctx.getMethodCall(), gtx, ctx);
+ }
+ return retval;
+ }
- // on a local prepare we first run the prepare -
- //if this works broadcast it
- GlobalTransaction gtx = null; // don't initialise this here; since some method
calls may not have gtxs (such as buddy group organisation calls)
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
+ {
+ //lets broadcast the commit first
+ Throwable remoteCommitException = null;
+ gtx = getGlobalTransaction(ctx);
+ if (!gtx.isRemote() && ctx.isOriginLocal() &&
broadcastTxs.contains(gtx))
+ {
+ //we dont do anything
+ try
+ {
+ broadcastCommit(gtx, ctx);
+ }
+ catch (Throwable t)
+ {
+ log.error("A problem occurred with remote commit", t);
+ remoteCommitException = t;
+ }
+ }
- switch (m.getMethodId())
+ Object retval = nextInterceptor(ctx);
+ if (remoteCommitException != null)
{
- case MethodDeclarations.optimisticPrepareMethod_id:
- // pass up the chain.
- retval = super.invoke(ctx);
- gtx = getGlobalTransaction(ctx);
+ throw remoteCommitException;
+ }
+ return retval;
+ }
- if (!gtx.isRemote() && ctx.isOriginLocal())
- {
- // replicate the prepare call.
- broadcastPrepare(m, gtx, ctx);
- }
- break;
- case MethodDeclarations.commitMethod_id:
- //lets broadcast the commit first
- Throwable remoteCommitException = null;
- gtx = getGlobalTransaction(ctx);
- if (!gtx.isRemote() && ctx.isOriginLocal() &&
broadcastTxs.contains(gtx))
- {
- //we dont do anything
- try
- {
- broadcastCommit(gtx, ctx);
- }
- catch (Throwable t)
- {
- log.error("A problem occurred with remote commit", t);
- remoteCommitException = t;
- }
- }
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
+ {
+ // lets broadcast the rollback first
+ gtx = getGlobalTransaction(ctx);
+ Throwable remoteRollbackException = null;
+ if (!gtx.isRemote() && ctx.isOriginLocal() &&
broadcastTxs.contains(gtx))
+ {
+ //we dont do anything
+ try
+ {
+ broadcastRollback(gtx, ctx);
+ }
+ catch (Throwable t)
+ {
+ log.error(" a problem occurred with remote rollback", t);
+ remoteRollbackException = t;
+ }
- retval = super.invoke(ctx);
- if (remoteCommitException != null)
- {
- throw remoteCommitException;
- }
- break;
- case MethodDeclarations.rollbackMethod_id:
- // lets broadcast the rollback first
- gtx = getGlobalTransaction(ctx);
- Throwable remoteRollbackException = null;
- if (!gtx.isRemote() && ctx.isOriginLocal() &&
broadcastTxs.contains(gtx))
- {
- //we dont do anything
- try
- {
- broadcastRollback(gtx, ctx);
- }
- catch (Throwable t)
- {
- log.error(" a problem occurred with remote rollback", t);
- remoteRollbackException = t;
- }
-
- }
- retval = super.invoke(ctx);
- if (remoteRollbackException != null)
- {
- throw remoteRollbackException;
- }
- break;
- case MethodDeclarations.putForExternalReadMethodLocal_id:
- gtx = getGlobalTransaction(ctx);
- cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
- // and follow on to default behaviour now ...
- default:
- //it is something we do not care about
- if (log.isTraceEnabled()) log.trace("Received method " + m + "
not handling");
- retval = super.invoke(ctx);
- break;
}
+ Object retval = nextInterceptor(ctx);
+ if (remoteRollbackException != null)
+ {
+ throw remoteRollbackException;
+ }
return retval;
}
+ protected Object handlePutForExternalReadVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, DataVersion dv) throws
Throwable
+ {
+ gtx = getGlobalTransaction(ctx);
+ cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
+ return nextInterceptor(ctx);
+ }
+
private GlobalTransaction getGlobalTransaction(InvocationContext ctx)
{
// get the current gtx
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -12,13 +12,12 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeSPI;
import static org.jboss.cache.config.Configuration.CacheMode;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.optimistic.DataVersioningException;
import org.jboss.cache.optimistic.DefaultDataVersion;
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.WorkspaceNode;
import org.jboss.cache.transaction.GlobalTransaction;
+import org.jgroups.Address;
import java.util.Collection;
import java.util.List;
@@ -55,47 +54,15 @@
useTombstones = (mode == CacheMode.INVALIDATION_ASYNC) || (mode ==
CacheMode.INVALIDATION_SYNC);
}
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
- Object retval = null;
+ TransactionWorkspace workspace =
getTransactionWorkspace(getGlobalTransaction(ctx));
- // Methods we are interested in are prepare/commit
- // They do not go further than this interceptor
- switch (m.getMethodId())
- {
- case MethodDeclarations.optimisticPrepareMethod_id:
- // should pass in a different prepare here
- validateNodes(getGlobalTransaction(ctx));
- break;
- case MethodDeclarations.commitMethod_id:
- commit(getGlobalTransaction(ctx));
- break;
- case MethodDeclarations.rollbackMethod_id:
- rollBack(getGlobalTransaction(ctx));
- break;
- default:
- retval = super.invoke(ctx);
- break;
- }
- return retval;
- }
-
- private void validateNodes(GlobalTransaction gtx) throws CacheException
- {
- TransactionWorkspace workspace = getTransactionWorkspace(gtx);
-
// There is no guarantee that this collection is in any order!
Collection<WorkspaceNode> nodes = workspace.getNodes().values();
//we ought to have all necessary locks here so lets try and validate
if (log.isDebugEnabled()) log.debug("Validating " + nodes.size() + "
nodes.");
- simpleValidate(nodes);
- log.debug("Successfully validated nodes");
- }
-
- private void simpleValidate(Collection<WorkspaceNode> nodes) throws
DataVersioningException
- {
for (WorkspaceNode workspaceNode : nodes)
{
if (workspaceNode.isDirty())
@@ -155,11 +122,14 @@
if (trace) log.trace("Node [" + workspaceNode.getFqn() + "]
doesn't need validating as it isn't dirty");
}
}
+ log.debug("Successfully validated nodes");
+ return nextInterceptor(ctx);
}
-
- private void commit(GlobalTransaction gtx)
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
{
+ GlobalTransaction gtx = getGlobalTransaction(ctx);
+
TransactionWorkspace workspace;
try
@@ -169,7 +139,7 @@
catch (CacheException e)
{
log.warn("we can't rollback", e);
- return;
+ return nextInterceptor(ctx);
}
if (log.isDebugEnabled()) log.debug("Commiting successfully validated changes
for GlobalTransaction " + gtx);
@@ -196,7 +166,7 @@
underlyingNode.setValid(false, true);
// we need to update versions here, too
performVersionUpdate(underlyingNode, workspaceNode);
-
+
if (!useTombstones)
{
// don't retain the tombstone
@@ -206,7 +176,7 @@
throw new CacheException("Underlying node " +
underlyingNode + " has no parent");
}
- parent.removeChildDirect(underlyingNode.getFqn().getLastElement());
+ parent.removeChildDirect(underlyingNode.getFqn().getLastElement());
}
}
}
@@ -262,9 +232,18 @@
}
}
}
+ return nextInterceptor(ctx);
+ }
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ TransactionWorkspace workspace;
+ workspace = getTransactionWorkspace(getGlobalTransaction(ctx));
+ workspace.clearNodes();
+ return nextInterceptor(ctx);
}
+
private void validateNodeAndParents(NodeSPI node)
{
node.setValid(true, false);
@@ -288,10 +267,4 @@
log.trace("Setting version of node " + underlyingNode.getFqn() +
" from " + workspaceNode.getVersion() + " to " +
underlyingNode.getVersion());
}
- private void rollBack(GlobalTransaction gtx)
- {
- TransactionWorkspace workspace;
- workspace = getTransactionWorkspace(gtx);
- workspace.clearNodes();
- }
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/PassivationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/PassivationInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/PassivationInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -5,8 +5,7 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.loader.CacheLoader;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
+import org.apache.commons.logging.Log;
import java.util.Collections;
import java.util.HashMap;
@@ -20,7 +19,7 @@
* @author <a href="mailto:{hmesha@novell.com}">{Hany Mesha}</a>
* @version $Id$
*/
-public class PassivationInterceptor extends Interceptor implements
PassivationInterceptorMBean
+public class PassivationInterceptor extends MethodDispacherInterceptor implements
PassivationInterceptorMBean
{
protected CacheLoader loader = null;
@@ -32,58 +31,43 @@
this.loader = cache.getCacheLoaderManager().getCacheLoader();
}
+ protected Log getLog()
+ {
+ return null;
+ }
+
/**
- * Notifies the cache instance listeners that the evicted node is about to
+ * Notifies the cache instance listeners that the evicted node is about to
* be passivated and stores the evicted node and its attributes back to the
* store using the CacheLoader.
- *
- * @return
- * @throws Throwable
*/
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Object handleEvictMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
- // hmesha- We don't need to handle transaction during passivation since
- // passivation happens local to a node and never replicated
-
- // evict() method need to be applied to the CacheLoader before passing the call on
- if (m.getMethodId() == MethodDeclarations.evictNodeMethodLocal_id)
+ try
{
- Object[] args = m.getArgs();
- Fqn fqn = (Fqn) args[0];
- try
+ // evict method local doesn't hold attributes therefore we have
+ // to get them manually
+ Map attributes = getNodeAttributes(fqn);
+ // notify listeners that this node is about to be passivated
+ cache.getNotifier().notifyNodePassivated(fqn, true, attributes, ctx);
+ loader.put(fqn, attributes);
+ cache.getNotifier().notifyNodePassivated(fqn, false, Collections.emptyMap(),
ctx);
+ if (getStatisticsEnabled() &&
configuration.getExposeManagementStatistics())
{
- // synchronizations now handled in the cache loader implementation
-// synchronized (this)
-// {
- // evict method local doesn't hold attributes therefore we have
- // to get them manually
- Map attributes = getNodeAttributes(fqn);
- // notify listeners that this node is about to be passivated
- cache.getNotifier().notifyNodePassivated(fqn, true, attributes, ctx);
-
- loader.put(fqn, attributes);
-
- cache.getNotifier().notifyNodePassivated(fqn, false, Collections.emptyMap(),
ctx);
-// }
-
- if (getStatisticsEnabled() &&
configuration.getExposeManagementStatistics())
- {
- m_passivations.getAndIncrement();
- }
+ m_passivations.getAndIncrement();
}
- catch (NodeNotLoadedException e)
+ }
+ catch (NodeNotLoadedException e)
+ {
+ if (log.isTraceEnabled())
{
- if (log.isTraceEnabled())
- {
- log.trace("Node " + fqn + " not loaded in memory;
passivation skipped");
- }
+ log.trace("Node " + fqn + " not loaded in memory; passivation
skipped");
}
}
-
- return super.invoke(ctx);
+ return nextInterceptor(ctx);
}
+
public long getPassivations()
{
return m_passivations.get();
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -6,28 +6,18 @@
*/
package org.jboss.cache.interceptors;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheImpl;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.NodeSPI;
+import org.apache.commons.logging.Log;
+import org.jboss.cache.*;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.jgroups.Address;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
/**
* An interceptor that handles locking. When a TX is associated, we register
@@ -37,8 +27,9 @@
*
* @author Bela Ban
* @version $Id$
+ * //todo = try to see how acquireLuckWithTimeout works inline
*/
-public class PessimisticLockInterceptor extends Interceptor
+public class PessimisticLockInterceptor extends MethodDispacherInterceptor
{
private TransactionTable tx_table = null;
@@ -58,182 +49,207 @@
lock_acquisition_timeout = cache.getConfiguration().getLockAcquisitionTimeout();
}
+ protected Log getLog()
+ {
+ return log;
+ }
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
- Fqn fqn = null;
- NodeLock.LockType lock_type = NodeLock.LockType.NONE;
- Object[] args = m.getArgs();
- boolean lockNecessary = false;
- boolean locksAlreadyObtained = false;
+ return handlePutMethod(ctx, fqn);
+ }
- if (log.isTraceEnabled()) log.trace("PessimisticLockInterceptor invoked for
method " + m);
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ return handlePutMethod(ctx, fqn);
+ }
+
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ return handlePutMethod(ctx, fqn);
+ }
+
+ private Object handlePutMethod(InvocationContext ctx, Fqn fqn)
+ throws Throwable
+ {
+ log.trace("Suppressing locking");
if (ctx.getOptionOverrides() != null &&
ctx.getOptionOverrides().isSuppressLocking())
{
- log.trace("Suppressing locking");
- switch (m.getMethodId())
- {
- case MethodDeclarations.putDataMethodLocal_id:
- case MethodDeclarations.putDataEraseMethodLocal_id:
- case MethodDeclarations.putKeyValMethodLocal_id:
- log.trace("Creating nodes if necessary");
- createNodes((Fqn) args[1], ctx.getGlobalTransaction());
- break;
- }
-
- return super.invoke(ctx);
+ log.trace("Creating nodes if necessary");
+ createNodes(fqn, ctx.getGlobalTransaction());
+ } else
+ {
+ acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, false, true, false,
false, false, false);
}
+ return nextInterceptor(ctx);
+ }
- /** List<IdentityLock> locks. Locks acquired during the current method; will
be released later by UnlockInterceptor.
- * This list is only populated when there is no TX, otherwise the TransactionTable
maintains the locks
- * (keyed by TX) */
- // List locks=null;
+ protected Object handleLockMethod(InvocationContext ctx, Fqn fqn, NodeLock.LockType
lockType, boolean recursive) throws Throwable
+ {
+ acquireLocksWithTimeout(ctx, fqn, lockType, recursive, false, false, false, false,
false);
+ return null;
+ }
- boolean recursive = false;
- boolean createIfNotExists = false;
- boolean zeroLockTimeout = false;// only used if the call is an evict() call. See
JBCACHE-794
- boolean isDeleteOperation = false;// needed for JBCACHE-871
- boolean isEvictOperation = false;
- boolean isRemoveDataOperation = false;
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ // commit propagated up from the tx interceptor
+ commit(ctx.getGlobalTransaction());
+ Object retVal = nextInterceptor(ctx);
+ cleanup(ctx.getGlobalTransaction());
+ return retVal;
+ }
- // 1. Determine the type of lock (read, write, or none) depending on the method. If
no lock is required, invoke
- // the method, then return immediately
- // Set the Fqn
- switch (m.getMethodId())
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
+ {
+ return handlePrepareMethod(ctx, gtx, modifications, address, onePhaseCommit);
+ }
+
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ commit(globalTransaction);
+ if (log.isTraceEnabled())
{
- case MethodDeclarations.moveMethodLocal_id:
- fqn = (Fqn) args[0];
- obtainLocksForMove(ctx, fqn, (Fqn) args[1]);
- locksAlreadyObtained = true;
- lockNecessary = true;
- isDeleteOperation = true;
- break;
- case MethodDeclarations.putForExternalReadMethodLocal_id:
- createIfNotExists = true;
- fqn = (Fqn) args[1];
- lock_type = NodeLock.LockType.WRITE;
- zeroLockTimeout = true;
- break;
- case MethodDeclarations.removeNodeMethodLocal_id:
- isDeleteOperation = true;
- fqn = (Fqn) args[1];
- lock_type = NodeLock.LockType.WRITE;
- recursive = true;// remove node and *all* child nodes
- createIfNotExists = true;
- break;
- case MethodDeclarations.putDataMethodLocal_id:
- case MethodDeclarations.putDataEraseMethodLocal_id:
- case MethodDeclarations.putKeyValMethodLocal_id:
- createIfNotExists = true;
- fqn = (Fqn) args[1];
- lock_type = NodeLock.LockType.WRITE;
- break;
- case MethodDeclarations.removeKeyMethodLocal_id:
- case MethodDeclarations.removeDataMethodLocal_id:
- isRemoveDataOperation = true;
- case MethodDeclarations.addChildMethodLocal_id:
- fqn = (Fqn) args[1];
- lock_type = NodeLock.LockType.WRITE;
- break;
- case MethodDeclarations.evictNodeMethodLocal_id:
- zeroLockTimeout = true;
- fqn = (Fqn) args[0];
- lock_type = NodeLock.LockType.WRITE;
- isEvictOperation = true;
- break;
- case MethodDeclarations.getKeyValueMethodLocal_id:
- case MethodDeclarations.getNodeMethodLocal_id:
- case MethodDeclarations.getKeysMethodLocal_id:
- case MethodDeclarations.getChildrenNamesMethodLocal_id:
- case MethodDeclarations.releaseAllLocksMethodLocal_id:
- case MethodDeclarations.printMethodLocal_id:
- fqn = (Fqn) args[0];
- lock_type = NodeLock.LockType.READ;
- break;
- case MethodDeclarations.lockMethodLocal_id:
- fqn = (Fqn) args[0];
- lock_type = (NodeLock.LockType) args[1];
- recursive = (Boolean) args[2];
- break;
- case MethodDeclarations.commitMethod_id:
- // commit propagated up from the tx interceptor
- commit(ctx.getGlobalTransaction());
- break;
- case MethodDeclarations.rollbackMethod_id:
- // rollback propagated up from the tx interceptor
- rollback(ctx.getGlobalTransaction());
- break;
- default:
- if (isOnePhaseCommitPrepareMehod(m))
- {
- // commit propagated up from the tx interceptor
- commit(ctx.getGlobalTransaction());
- }
- break;
+ log.trace("bypassed locking as method commit() doesn't require
locking");
}
+ Object retVal = nextInterceptor(ctx);
+ cleanup(globalTransaction);
+ return retVal;
+ }
- // Lock the node (must be either read or write if we get here)
- // If no TX: add each acquired lock to the list of locks for this method (locks)
- // If TX: [merge code from TransactionInterceptor]: register with TxManager, on
commit/rollback,
- // release the locks for the given TX
- boolean created = false;
- if (fqn != null)
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ rollback(globalTransaction);
+ if (log.isTraceEnabled())
{
- if (!locksAlreadyObtained)
- {
- long timeout = zeroLockTimeout ? 0 : getLockAcquisitionTimeout(ctx);
- // make sure we can bail out of this loop
- long cutoffTime = System.currentTimeMillis() + timeout;
- boolean firstTry = true;
- do
- {
- // this is an additional check to make sure we don't try for too
long.
- if (!firstTry && System.currentTimeMillis() > cutoffTime) throw
new TimeoutException("Unable to acquire lock on Fqn " + fqn + " after
" + timeout + " millis");
- created = lock(ctx, fqn, lock_type, recursive, createIfNotExists, timeout,
isDeleteOperation, isEvictOperation, isRemoveDataOperation);
- firstTry = false;
- }
- while (createIfNotExists && cache.peek(fqn, true) == null);// keep
trying until we have the lock (fixes concurrent remove())
- }
+ log.trace("bypassed locking as method rollback() doesn't require
locking");
}
- else if (!lockNecessary)
+ Object retVal = nextInterceptor(ctx);
+ cleanup(globalTransaction);
+ return retVal;
+ }
+
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
+ long timeout = getLockAcquisitionTimeout(ctx);
+ // this call will ensure the node gets a WL and it's current parent gets RL.
+ if (log.isTraceEnabled()) log.trace("Attempting to get WL on node to be moved
[" + from + "]");
+ lock(ctx, from, NodeLock.LockType.WRITE, true, false, timeout, true, false,
false);
+ //now for an RL for the new parent.
+ if (log.isTraceEnabled()) log.trace("Attempting to get RL on new parent
[" + to + "]");
+ lock(ctx, to, NodeLock.LockType.READ, true, false, timeout, false, false, false);
+ Object retValue = nextInterceptor(ctx);
+ // do a REAL remove here.
+ NodeSPI n = cache.peek(from, true);
+ if (n != null)
{
- if (log.isTraceEnabled())
- {
- log.trace("bypassed locking as method " + m.getName() + "()
doesn't require locking");
- }
+ lockManager.getLock(n).releaseAll(Thread.currentThread());
}
- if (m.getMethodId() == MethodDeclarations.lockMethodLocal_id)
+ return retValue;
+ }
+
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ boolean created = acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, true,
true, false, true, false, false);
+ Object retVal = nextInterceptor(ctx);
+ if (ctx.getGlobalTransaction() == null)
{
- return null;
- }
- Object o = super.invoke(ctx);
- // FIXME this should be done in UnlockInterceptor, but I didn't want
- // to add the removedNodes map to CacheImpl
- if (isDeleteOperation && ctx.getGlobalTransaction() == null)
- {
- //CacheIml._move internally removes the source node. 'if clause' solve
the scenario when a node is
- // moved in the same place, i.e. it will be removed from the source - which is
also destination - so it ends
- // up being removed for good
- if (!(MethodDeclarations.moveMethodLocal_id == m.getMethodId()))
- {
- ((CacheImpl) cache).realRemove(fqn, true);
- }
- // do a REAL remove here.
+ ((CacheImpl) cache).realRemove(fqn, true);
NodeSPI n = cache.peek(fqn, true);
if (n != null)
{
lockManager.getLock(n).releaseAll(Thread.currentThread());
}
}
- else if (m.getMethodId() == MethodDeclarations.commitMethod_id ||
isOnePhaseCommitPrepareMehod(m) || m.getMethodId() ==
MethodDeclarations.rollbackMethod_id)
- {
- cleanup(ctx.getGlobalTransaction());
- }
// if this is a delete op and we had to create the node, return a FALSE as nothing
*really* was deleted!
- return isDeleteOperation && created ? false : o;
+ return created ? false : retVal;
}
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, false, true, true,
false, false, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ return handleRemoveDataMethod(ctx, tx, fqn, createUndoOps);
+ }
+
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, false, false, false,
false, false, true);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleAddChildMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
parentFqn, Object childName, Node cn, boolean createUndoOps) throws Throwable
+ {
+ acquireLocksWithTimeout(ctx, parentFqn, NodeLock.LockType.WRITE, false, false,
false, false, false, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleEvictMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, false, false, true,
false, true, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
+ {
+ return acquireNonRecursiveReadLock(ctx, fqn);
+ }
+
+ private Object acquireNonRecursiveReadLock(InvocationContext ctx, Fqn fqn)
+ throws Throwable
+ {
+ acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, false, false, false,
false, false, false);
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return acquireNonRecursiveReadLock(ctx, fqn);
+ }
+
+ protected Object handleGetKeysMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return acquireNonRecursiveReadLock(ctx, fqn);
+ }
+
+ protected Object handleGetChildrenNamesMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return acquireNonRecursiveReadLock(ctx, fqn);
+ }
+
+ protected Object handlePrintMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return acquireNonRecursiveReadLock(ctx, fqn);
+ }
+
+ protected Object handleReleaseAllLocksMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return acquireNonRecursiveReadLock(ctx, fqn);
+ }
+
+ private boolean acquireLocksWithTimeout(InvocationContext ctx, Fqn fqn,
NodeLock.LockType lock_type, boolean recursive, boolean createIfNotExists, boolean
zeroLockTimeout, boolean deleteOperation, boolean evictOperation, boolean
removeDataOperation)
+ throws InterruptedException
+ {
+ boolean created;
+ long timeout = zeroLockTimeout ? 0 : getLockAcquisitionTimeout(ctx);
+ // make sure we can bail out of this loop
+ long cutoffTime = System.currentTimeMillis() + timeout;
+ boolean firstTry = true;
+ do
+ {
+ // this is an additional check to make sure we don't try for too
long.
+ if (!firstTry && System.currentTimeMillis() > cutoffTime) throw new
TimeoutException("Unable to acquire lock on Fqn " + fqn + " after " +
timeout + " millis");
+ created = lock(ctx, fqn, lock_type, recursive, createIfNotExists, timeout,
deleteOperation, evictOperation, removeDataOperation);
+ firstTry = false;
+ }
+ while (createIfNotExists && cache.peek(fqn, true) == null);// keep trying
until we have the lock (fixes concurrent remove())
+ return created;
+ }
+
+ //todo move to InvocationContext as it is information expert
private long getLockAcquisitionTimeout(InvocationContext ctx)
{
long timeout = lock_acquisition_timeout;
@@ -246,27 +262,9 @@
}
- private void obtainLocksForMove(InvocationContext ctx, Fqn node, Fqn parent) throws
InterruptedException
- {
- // parent node (new parent) and current node's existing parent should both get
RLs.
- // node should have a WL.
-
- long timeout = getLockAcquisitionTimeout(ctx);
-
- // this call will ensure the node gets a WL and it's current parent gets RL.
- if (log.isTraceEnabled()) log.trace("Attempting to get WL on node to be moved
[" + node + "]");
- lock(ctx, node, NodeLock.LockType.WRITE, true, false, timeout, true, false,
false);
-
- //now for an RL for the new parent.
- if (log.isTraceEnabled()) log.trace("Attempting to get RL on new parent
[" + parent + "]");
- lock(ctx, parent, NodeLock.LockType.READ, true, false, timeout, false, false,
false);
- }
-
-
/**
* Locks a given node.
*
- * @param fqn
* @param lock_type DataNode.LOCK_TYPE_READ, DataNode.LOCK_TYPE_WRITE or
DataNode.LOCK_TYPE_NONE
* @param recursive Lock children recursively
* @return true if the node had to be created
@@ -461,7 +459,7 @@
if (!isTargetNode && cache.peek(new Fqn(currentNode.getFqn(),
targetFqn.get(currentNodeIndex + 1)), false) == null)
{
- return isPutOperation;// we're at a node in the tree, not yet at the
target node, and we need to create the next node. So we need a WL here.
+ return isPutOperation;// we're at a node in the tree, not yet at the
target node, and we need to create the nextInterceptor node. So we need a WL here.
}
}
return lock_type == NodeLock.LockType.WRITE && isTargetNode &&
(isPutOperation || isRemoveOperation || isEvictOperation ||
isRemoveDataOperation);//normal operation, write lock explicitly requested and this is the
target to be written to.
@@ -516,6 +514,7 @@
}
}
+ //todo move to lock tabe as it is information xprt
private List<NodeLock> getLocks(Thread currentThread)
{
// This sort of looks like a get/put race condition, but
@@ -565,6 +564,7 @@
* Remove all locks held by <tt>tx</tt>, remove the transaction from the
transaction table
*
* @param gtx
+ * todo move this logic in txTable as it is information expert for this class (this
looks procedural)
*/
private void commit(GlobalTransaction gtx)
{
@@ -590,6 +590,9 @@
}
}
+ /**
+ * todo move this logic in txTable as it is information expert for this class (this
looks procedural)
+ */
private void cleanup(GlobalTransaction gtx)
{
if (log.isTraceEnabled()) log.trace("Cleaning up locks for gtx " + gtx);
@@ -620,7 +623,7 @@
* <li>Remove all temporary nodes created by the current TX</li>
* </ol>
*
- * @param tx
+ * todo move this logic in txTable as it is information expert for this class (this
looks procedural)
*/
private void rollback(GlobalTransaction tx)
{
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -1,12 +1,19 @@
package org.jboss.cache.interceptors;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.optimistic.DataVersion;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.transaction.GlobalTransaction;
+import org.apache.commons.logging.Log;
+import org.jgroups.Address;
+import java.util.List;
+import java.util.Map;
+
/**
* Takes care of replicating modifications to other nodes in a cluster. Also
* listens for prepare(), commit() and rollback() messages which are received
@@ -18,105 +25,200 @@
public class ReplicationInterceptor extends BaseRpcInterceptor
{
- public Object invoke(InvocationContext ctx) throws Throwable
+ protected Log getLog()
{
- MethodCall m = ctx.getMethodCall();
- GlobalTransaction gtx = ctx.getGlobalTransaction();
+ return log;
+ }
- // bypass for buddy group org metod calls.
- if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return
super.invoke(ctx);
- boolean isLocalCommitOrRollback = gtx != null && !gtx.isRemote() &&
(m.getMethodId() == MethodDeclarations.commitMethod_id || m.getMethodId() ==
MethodDeclarations.rollbackMethod_id);
-
- if (log.isTraceEnabled()) log.trace("isLocalCommitOrRollback? " +
isLocalCommitOrRollback + "; gtx = " + gtx);
-
- // pass up the chain if not a local commit or rollback (in which case replicate
first)
- Object o = isLocalCommitOrRollback ? null : super.invoke(ctx);
-// ctx = cache.getInvocationContext();
-
+ protected boolean skipMethodCall(InvocationContext ctx)
+ {
Option optionOverride = ctx.getOptionOverrides();
-
if (optionOverride != null && optionOverride.isCacheModeLocal() &&
ctx.getTransaction() == null)
{
log.trace("skip replication");
- return isLocalCommitOrRollback ? super.invoke(ctx) : o;
+ return true;
}
+ return false;
+ }
- // could be potentially TRANSACTIONAL. If so, we register for transaction
completion callbacks (if we
- // have not yet done so
- if (ctx.getTransaction() != null)
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
+ {
+ if (skipReplciationOfTransactionMethod(ctx))
{
- if (gtx != null && !gtx.isRemote())
- {
- // lets see what sort of method we've got.
- switch (m.getMethodId())
- {
- case MethodDeclarations.commitMethod_id:
- // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
- if (containsModifications(ctx))
- replicateCall(m, configuration.isSyncCommitPhase(),
ctx.getOptionOverrides());
- // now pass up the chain
- o = super.invoke(ctx);
- break;
- case MethodDeclarations.prepareMethod_id:
- if (containsModifications(ctx))
- {
- // this is a prepare method
- runPreparePhase(m, gtx, ctx);
- }
- break;
- case MethodDeclarations.rollbackMethod_id:
- // REPL_ASYNC will result in only a prepare() method - 1 phase commit.
- if (containsModifications(ctx) && !ctx.isLocalRollbackOnly())
- {
- replicateCall(m, configuration.isSyncRollbackPhase(),
ctx.getOptionOverrides());
- }
- // now pass up the chain
- o = super.invoke(ctx);
- break;
- case MethodDeclarations.putForExternalReadMethodLocal_id:
- cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
- }
- }
+ return nextInterceptor(ctx);
}
- else if (MethodDeclarations.isCrudMethod(m.getMethodId()))
+ replicateCall(ctx.getMethodCall(), configuration.isSyncCommitPhase(),
ctx.getOptionOverrides());
+ return nextInterceptor(ctx);
+ }
+
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ if (skipReplciationOfTransactionMethod(ctx))
{
- // NON-TRANSACTIONAL and CRUD method
- if (log.isTraceEnabled()) log.trace("Non-tx crud meth");
+ return nextInterceptor(ctx);
+ }
+ Object retVal = nextInterceptor(ctx);
+ runPreparePhase(ctx.getMethodCall(), gtx, ctx);
+ return retVal;
+ }
- // don't re-broadcast if we've received this from another cache in the
cluster.
- if (ctx.isOriginLocal())
- {
- handleReplicatedMethod(m, isSynchronous(optionOverride), ctx);
- }
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction gtx)
throws Throwable
+ {
+ if (skipReplciationOfTransactionMethod(ctx))
+ {
+ return nextInterceptor(ctx);
}
- else
+ if (!ctx.isLocalRollbackOnly())
{
- if (log.isTraceEnabled()) log.trace("Non-tx and non crud meth");
+ replicateCall(ctx.getMethodCall(), configuration.isSyncRollbackPhase(),
ctx.getOptionOverrides());
}
+ return nextInterceptor(ctx);
- return o;
}
- void handleReplicatedMethod(MethodCall m, boolean synchronous, InvocationContext ctx)
throws Throwable
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value) throws Throwable
{
- if (log.isTraceEnabled())
+ if (isTransactionalAndLocal(ctx))
{
- log.trace("invoking method " + m + ", members=" +
cache.getMembers() + ", mode=" +
- configuration.getCacheMode() + ", exclude_self=" + true +
", timeout=" +
- configuration.getSyncReplTimeout());
+ Object returnValue = nextInterceptor(ctx);
+ cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
+ return returnValue;
+ } else {
+ return handleCrudMethod(ctx);
}
- if (!synchronous || m.getMethodId() ==
MethodDeclarations.putForExternalReadMethodLocal_id)
+ }
+
+ /**
+ * It does not make sense replicating a transaction method(commit, rollback, prepare)
if one of the following:
+ * <pre>
+ * - call was not initiated here, but on other member of the cluster
+ * - there is no transaction. Why calling a commit if no transaction going on?
+ * - the current transaction did not modufy any data, so other members are not
aware of it
+ * </pre>
+ */
+ private boolean skipReplciationOfTransactionMethod(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ boolean isInitiatedHere = gtx != null && !gtx.isRemote();
+ if (log.isTraceEnabled()) log.trace("isInitiatedHere? " + isInitiatedHere
+ "; gtx = " + gtx);
+ return !isTransactionalAndLocal(ctx) || !containsModifications(ctx);
+ }
+
+ /**
+ * The call runs in a transaction and it was initiated on this node of the cluster.
+ */
+ private boolean isTransactionalAndLocal(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ boolean isInitiatedHere = gtx != null && !gtx.isRemote();
+ return isInitiatedHere && (ctx.getTransaction() != null);
+ }
+
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handlePutKeyValueVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, boolean createUndoOps,
DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handlePutDataEraseVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Map data, boolean createUndoOps, boolean eraseContent,
DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handlePutDataVersionedMethod(InvocationContext ctx, GlobalTransaction
globalTransaction, Fqn fqn, Map map, Boolean createUndoOps, DataVersion dataVersion)
throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handlePutForExternalReadVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, DataVersion dv) throws
Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleDataGravitationCleanupMethod(InvocationContext ctx,
GlobalTransaction globalTransaction, Fqn primary, Fqn backup) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleRemoveNodeVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleRemoveKeyVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, boolean createUndoOps, DataVersion dv) throws
Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ protected Object handleRemoveDataVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return handleCrudMethod(ctx);
+ }
+
+ /**
+ * If we are within one transaction we won't do any replication as replication
would only be performed at commit time.
+ * If the operation didn't originate locally we won't do any replication
either.
+ */
+ private Object handleCrudMethod(InvocationContext ctx)
+ throws Throwable
+ {
+ if (ctx.getTransaction() == null && ctx.isOriginLocal() )
{
- // 2. Replicate change to all *other* members (exclude self !)
- replicateCall(m, false, ctx.getOptionOverrides());
+ MethodCall m = ctx.getMethodCall();
+ if (log.isTraceEnabled())
+ {
+ log.trace("invoking method " + m + ", members=" +
cache.getMembers() + ", mode=" +
+ configuration.getCacheMode() + ", exclude_self=" + true +
", timeout=" +
+ configuration.getSyncReplTimeout());
+ }
+ if (!isSynchronous(ctx.getOptionOverrides()) || m.getMethodId() ==
MethodDeclarations.putForExternalReadMethodLocal_id)
+ {
+ // 2. Replicate change to all *other* members (exclude self !)
+ replicateCall(m, false, ctx.getOptionOverrides());
+ }
+ else
+ {
+ // REVISIT Needs to exclude itself and apply the local change manually.
+ // This is needed such that transient field is modified properly in-VM.
+ replicateCall(m, true, ctx.getOptionOverrides());
+ }
}
- else
- {
- // REVISIT Needs to exclude itself and apply the local change manually.
- // This is needed such that transient field is modified properly in-VM.
- replicateCall(m, true, ctx.getOptionOverrides());
- }
+ return nextInterceptor(ctx);
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -6,9 +6,8 @@
*/
package org.jboss.cache.interceptors;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.ReplicationException;
+import org.jboss.cache.*;
+import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.marshall.MethodCall;
@@ -18,6 +17,8 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionEntry;
+import org.apache.commons.logging.Log;
+import org.jgroups.Address;
import javax.transaction.Status;
import javax.transaction.Synchronization;
@@ -48,108 +49,256 @@
private Map rollbackTransactions = new ConcurrentHashMap(16);
private long m_prepares = 0;
private long m_commits = 0;
+
private long m_rollbacks = 0;
+ protected Log getLog()
+ {
+ return log;
+ }
- /**
- * Set<GlobalTransaction> of GlobalTransactions that originated somewhere else
(we didn't create them).
- * This is a result of a PREPARE phase. GlobalTransactions in this list should be
ignored by this
- * interceptor when registering for TX completion
- */
- private Map remoteTransactions = new ConcurrentHashMap();
- public Object invoke(InvocationContext ctx) throws Throwable
+ @SuppressWarnings("unchecked")
+ protected Object handleOptimisticPrepareMethod(InvocationContext ctx,
GlobalTransaction gtx, List modifications, Map data, Address address, boolean
onePhaseCommit) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
-
- // skip block/unblock calls
- if (MethodDeclarations.isBlockUnblockMethod(m.getMethodId())) return
super.invoke(ctx);
-
- if (log.isTraceEnabled())
- {
- log.trace("(" + cache.getLocalAddress() + ") call on method
[" + m + "]");
- }
- // bypass for buddy group org metod calls.
- if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return
super.invoke(ctx);
-
+ Object result = null;
boolean scrubTxsOnExit = false;
- Option optionOverride = ctx.getOptionOverrides();
- Object result = null;
-
+ // this is a prepare, commit, or rollback.
+ if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " +
ctx.getGlobalTransaction());
try
{
- // first of all deal with tx methods - these are only going to be
- // prepare/commit/rollback called by a remote cache, since calling
- // such methods on CacheImpl directly would fail.
-
- if (MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
+ if (ctx.getGlobalTransaction().isRemote())
{
- // this is a prepare, commit, or rollback.
- if (log.isDebugEnabled()) log.debug("Got gtx from invocation context
" + ctx.getGlobalTransaction());
-
- if (ctx.getGlobalTransaction().isRemote())
remoteTransactions.put(ctx.getGlobalTransaction(), NULL);
-
- switch (m.getMethodId())
+ result = handleRemotePrepare(ctx, modifications, onePhaseCommit);
+ scrubTxsOnExit = true;
+ if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
{
- case MethodDeclarations.optimisticPrepareMethod_id:
- case MethodDeclarations.prepareMethod_id:
- if (ctx.getGlobalTransaction().isRemote())
- {
- result = handleRemotePrepare(ctx, m);
- scrubTxsOnExit = true;
- if (configuration.getExposeManagementStatistics() &&
getStatisticsEnabled())
- {
- m_prepares++;
- }
- }
- else
- {
- if (log.isTraceEnabled()) log.trace("received my own message
(discarding it)");
- result = null;
- }
- break;
- case MethodDeclarations.commitMethod_id:
- case MethodDeclarations.rollbackMethod_id:
- if (ctx.getGlobalTransaction().isRemote())
- {
- result = handleRemoteCommitRollback(ctx);
- scrubTxsOnExit = true;
- }
- else
- {
- if (log.isTraceEnabled()) log.trace("received my own message
(discarding it)");
- result = null;
- }
- break;
+ m_prepares++;
}
}
else
{
- // non-transaction lifecycle method.
- result = handleNonTxMethod(ctx);
+ if (log.isTraceEnabled()) log.trace("received my own message (discarding
it)");
+ result = null;
}
- }
- catch (Exception e)
+ } catch (Throwable e)
{
- if (optionOverride == null || !optionOverride.isFailSilently()) throw e;
- log.trace("There was a problem handling this request, but " +
- "failSilently was set, so suppressing exception", e);
+ throwIfNeeded(ctx, e);
}
finally
{
- // we should scrub txs after every call to prevent race conditions
- // basically any other call coming in on the same thread and hijacking any
running tx's
- // was highlighted in JBCACHE-606
+ scrubOnExist(ctx, scrubTxsOnExit);
+ }
+ return result;
+ }
- if (scrubTxsOnExit)
+ protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx,
List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
+ {
+ return handleOptimisticPrepareMethod(ctx, gtx, modification, null, coordinator,
onePhaseCommit);
+ }
+
+ @SuppressWarnings("Unchecked")
+ protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ Object result = null;
+ boolean scrubTxsOnExit = false;
+ try
+ {
+ if (log.isTraceEnabled())
{
- setTransactionalContext(null, null, ctx);
+ log.trace("(" + cache.getLocalAddress() + ") call on method
[" + ctx.getMethodCall() + "]");
}
+ if (ctx.getGlobalTransaction().isRemote())
+ {
+ result = handleRemoteCommitRollback(ctx);
+ scrubTxsOnExit = true;
+ }
+ else
+ {
+ if (log.isTraceEnabled()) log.trace("received my own message (discarding
it)");
+ result = null;
+ }
+ } catch (Throwable throwable)
+ {
+ throwIfNeeded(ctx, throwable);
+ } finally
+ {
+ scrubOnExist(ctx, scrubTxsOnExit);
}
- return result;
+ return result;
}
+
+ protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction
globalTransaction) throws Throwable
+ {
+ return handleCommitMethod(ctx, globalTransaction);
+ }
+
+ protected Object handleLockMethod(InvocationContext ctx, Fqn fqn, NodeLock.LockType
lockType, boolean recursive) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleRemoveDataVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleRemoveKeyVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, boolean createUndoOps, DataVersion dv) throws
Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleRemoveNodeVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, boolean createUndoOps, DataVersion dv) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleDataGravitationCleanupMethod(InvocationContext ctx,
GlobalTransaction globalTransaction, Fqn primary, Fqn backup) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutForExternalReadVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, DataVersion dv) throws
Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutKeyValueVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Object key, Object value, boolean createUndoOps,
DataVersion dv) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutDataVersionedMethod(InvocationContext ctx, GlobalTransaction
globalTransaction, Fqn fqn, Map map, Boolean createUndoOps, DataVersion dataVersion)
throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutDataEraseVersionedMethod(InvocationContext ctx,
GlobalTransaction gtx, Fqn fqn, Map data, boolean createUndoOps, boolean eraseContent,
DataVersion dv) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleExistsMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleEvictVersionedNodeMethod(InvocationContext ctx, Fqn fqn,
DataVersion dataVersion) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleEvictMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, Object key, boolean createUndoOps) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx,
Fqn fqn, boolean createUndoOps) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleGetDataMapMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleGetKeysMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePrintMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleReleaseAllLocksMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleGetChildrenNamesMethod(InvocationContext ctx, Fqn fqn) throws
Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key,
boolean sendNodeEvent) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleAddChildMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
parentFqn, Object childName, Node cn, boolean createUndoOps) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws
Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx,
Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutForExternalReadMethod(InvocationContext ctx,
GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn
fqn, Map data, boolean createUndoOps) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt,
Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
+ {
+ return handleNonTxMethod(ctx);
+ }
+
+ private boolean throwIfNeeded(InvocationContext ctx, Throwable e) throws Throwable
+ {
+ Option optionOverride = ctx.getOptionOverrides();
+ boolean shouldRethtrow = optionOverride == null ||
!optionOverride.isFailSilently();
+ if (!shouldRethtrow)
+ {
+ log.trace("There was a problem handling this request, but failSilently was
set, so suppressing exception", e);
+ }
+ throw e;
+ }
+
+ /**
+ * we should scrub txs after every call to prevent race conditions
+ * basically any other call coming in on the same thread and hijacking any running
tx's
+ * was highlighted in JBCACHE-606
+ */
+ private void scrubOnExist(InvocationContext ctx, boolean scrubTxsOnExit)
+ {
+ if (scrubTxsOnExit)
+ {
+ setTransactionalContext(null, null, ctx);
+ }
+ }
+
public long getPrepares()
{
return m_prepares;
@@ -181,12 +330,11 @@
return retval;
}
- private Object handleRemotePrepare(InvocationContext ctx, MethodCall m) throws
Throwable
+ // --------------------------------------------------------------
+
+ private Object handleRemotePrepare(InvocationContext ctx,List<MethodCall>
modifications, boolean onePhase ) throws Throwable
{
GlobalTransaction gtx = ctx.getGlobalTransaction();
- List<MethodCall> modifications = (List<MethodCall>) m.getArgs()[1];
- boolean onePhase = (Boolean) m.getArgs()[configuration.isNodeLockingOptimistic() ?
4 : 3];
-
// Is there a local transaction associated with GTX ?
Transaction ltx = txTable.getLocalTransaction(gtx);
@@ -250,7 +398,7 @@
}
else
{
- retval = handlePessimisticPrepare(ctx, m, gtx, modifications, onePhase,
ltx);
+ retval = handlePessimisticPrepare(ctx, ctx.getMethodCall(), gtx,
modifications, onePhase, ltx);
}
}
finally
@@ -263,8 +411,6 @@
return retval;
}
-
- // --------------------------------------------------------------
// handler methods.
// --------------------------------------------------------------
@@ -277,54 +423,61 @@
*/
private Object handleNonTxMethod(InvocationContext ctx) throws Throwable
{
- MethodCall m = ctx.getMethodCall();
- Transaction tx = ctx.getTransaction();
Object result;
- // if there is no current tx and we're using opt locking, we need to use an
implicit tx.
- boolean implicitTransaction = configuration.isNodeLockingOptimistic() && tx
== null;
- if (implicitTransaction)
- {
- tx = createLocalTx();
- // we need to attach this tx to the InvocationContext.
- ctx.setTransaction(tx);
- }
- if (tx != null)
- attachGlobalTransaction(ctx, tx, m);
-
- GlobalTransaction gtx = ctx.getGlobalTransaction();
-
try
{
- result = super.invoke(ctx);
+ MethodCall m = ctx.getMethodCall();
+ Transaction tx = ctx.getTransaction();
+ // if there is no current tx and we're using opt locking, we need to use an
implicit tx.
+ boolean implicitTransaction = configuration.isNodeLockingOptimistic() &&
tx == null;
if (implicitTransaction)
{
- copyInvocationScopeOptionsToTxScope(ctx);
- copyForcedCacheModeToTxScope(ctx);
- txManager.commit();
+ tx = createLocalTx();
+ // we need to attach this tx to the InvocationContext.
+ ctx.setTransaction(tx);
}
- }
- catch (Throwable t)
+ if (tx != null)
+ attachGlobalTransaction(ctx, tx, m);
+
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+
+ try
{
- if (implicitTransaction)
+ result = nextInterceptor(ctx);
+ if (implicitTransaction)
+ {
+ copyInvocationScopeOptionsToTxScope(ctx);
+ copyForcedCacheModeToTxScope(ctx);
+ txManager.commit();
+ }
+ }
+ catch (Throwable t)
{
- log.warn("Rolling back, exception encountered", t);
- result = t;
- try
+ if (implicitTransaction)
{
- setTransactionalContext(tx, gtx, ctx);
- txManager.rollback();
+ log.warn("Rolling back, exception encountered", t);
+ result = t;
+ try
+ {
+ setTransactionalContext(tx, gtx, ctx);
+ txManager.rollback();
+ }
+ catch (Throwable th)
+ {
+ log.warn("Roll back failed encountered", th);
+ }
}
- catch (Throwable th)
+ else
{
- log.warn("Roll back failed encountered", th);
+ throw t;
}
}
- else
- {
- throw t;
- }
+ return result;
+ } catch (Throwable throwable)
+ {
+ throwIfNeeded(ctx, throwable);
+ return null;
}
- return result;
}
/**
@@ -392,7 +545,7 @@
Object retval;
if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare "
+ gtx);
replayModifications(modifications, ctx, true);
- retval = super.invoke(ctx);
+ retval = nextInterceptor(ctx);
// JBCACHE-361 Confirm that the transaction is ACTIVE
if (!isActive(ltx))
{
@@ -413,13 +566,13 @@
try
{
replayModifications(modifications, ctx, false);
- if (isOnePhaseCommitPrepareMehod(m))
+ if (m.isOnePhaseCommitPrepareMehod())
{
log.trace("Using one-phase prepare. Not propagating the prepare call
up the stack until called to do so by the sync handler.");
}
else
{
- super.invoke(ctx);
+ nextInterceptor(ctx);
}
// JBCACHE-361 Confirm that the transaction is ACTIVE
@@ -487,7 +640,6 @@
finally
{
transactions.remove(ltx);// JBAS-298
- remoteTransactions.remove(gtx);// JBAS-308
}
}
}
@@ -529,7 +681,7 @@
ctx.setMethodCall(modification);
}
- retval = super.invoke(ctx);
+ retval = nextInterceptor(ctx);
if (!isActive(ctx.getTransaction()))
{
@@ -605,7 +757,7 @@
if (log.isDebugEnabled()) log.debug(" executing " + m + "() with
local TX " + ltx + " under global tx " + gtx);
// pass commit up the chain
- // super.invoke(ctx);
+ // nextInterceptor(ctx);
// commit or rollback the tx.
if (m.getMethodId() == MethodDeclarations.commitMethod_id)
{
@@ -639,7 +791,6 @@
}
// remove from local lists.
- remoteTransactions.remove(gtx);
transactions.remove(ltx);
// this tx has completed. Clean up in the tx table.
@@ -687,7 +838,7 @@
//if (!ltx.equals(currentTx)) throw new IllegalStateException(" local
transaction " + ltx + " transaction does not match running tx " +
currentTx);
- result = super.invoke(ctx);
+ result = nextInterceptor(ctx);
if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for
" + gtx);
return result;
@@ -851,7 +1002,7 @@
if (txManager.getTransaction() != null && ltx != null &&
txManager.getTransaction().equals(ltx))
{
ctx.setMethodCall(prepareMethod);
- result = super.invoke(ctx);
+ result = nextInterceptor(ctx);
}
else
{
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/UnlockInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/UnlockInterceptor.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/UnlockInterceptor.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -33,7 +33,7 @@
{
try
{
- return super.invoke(ctx);
+ return nextInterceptor(ctx);
}
catch (Throwable th)
{
Modified: core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java 2007-11-29 17:10:28 UTC
(rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java 2007-11-29 22:44:11 UTC
(rev 4792)
@@ -109,7 +109,7 @@
attempted = true;
// Don't keep iterating due to the risk of
// ConcurrentModificationException if readers are removed
- // Just go back through our outer loop to get the next one
+ // Just go back through our outer loop to get the nextInterceptor one
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java 2007-11-29
17:10:28 UTC (rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -47,7 +47,7 @@
* When one of the writer gets wake up, it will first check
* if upgrade counter is zero. If not, it will first release the semaphore so the
upgrade
* thread can grab it, check the semaphore is gone, do notify, and issue myself
another
- * acquire to grab the next available semaphore.</li>
+ * acquire to grab the nextInterceptor available semaphore.</li>
* </ul>
*
* @author Ben Wang
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/MethodCall.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/MethodCall.java 2007-11-29 17:10:28
UTC (rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/MethodCall.java 2007-11-29 22:44:11
UTC (rev 4792)
@@ -37,6 +37,23 @@
// for serialization
}
+ /**
+ * This only works for prepare() and optimisticPrepare() method calls.
+ */
+ public boolean isOnePhaseCommitPrepareMehod()
+ {
+ switch (this.getMethodId())
+ {
+ case MethodDeclarations.prepareMethod_id:
+ return (Boolean) this.getArgs()[3];
+ case MethodDeclarations.optimisticPrepareMethod_id:
+ return (Boolean) this.getArgs()[4];
+ default:
+ return false;
+ }
+ }
+
+
protected MethodCall(Method method, Object... arguments)
{
super(method, arguments);
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/MethodDeclarations.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/MethodDeclarations.java 2007-11-29
17:10:28 UTC (rev 4791)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/MethodDeclarations.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -370,9 +370,6 @@
crudMethodIds.add(removeDataMethodLocal_id);
crudMethodIds.add(dataGravitationCleanupMethod_id);
crudMethodIds.add(moveMethodLocal_id);
- crudMethodIds.add(putDataVersionedMethodLocal_id);
- crudMethodIds.add(putDataEraseVersionedMethodLocal_id);
- crudMethodIds.add(putKeyValVersionedMethodLocal_id);
crudMethodIds.add(removeNodeVersionedMethodLocal_id);
crudMethodIds.add(removeKeyVersionedMethodLocal_id);
crudMethodIds.add(removeDataVersionedMethodLocal_id);
Modified:
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2007-11-29
17:10:28 UTC (rev 4791)
+++
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2007-11-29
22:44:11 UTC (rev 4792)
@@ -4,6 +4,7 @@
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;
+import org.testng.annotations.Test;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -23,6 +24,7 @@
/**
* @author xenephon
*/
+@Test(groups = {"functional", "transaction"})
public class AsyncFullStackInterceptorTest extends AbstractOptimisticTestCase
{