[jbosscache-commits] JBoss Cache SVN: r6529 - in core/trunk/src: main/java/org/jboss/cache/factories and 3 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Wed Aug 6 10:42:03 EDT 2008
Author: manik.surtani at jboss.com
Date: 2008-08-06 10:42:03 -0400 (Wed, 06 Aug 2008)
New Revision: 6529
Added:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
Removed:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
Modified:
core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java
core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
Log:
Fixed broken data gravitation with MVCC
Modified: core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2008-08-06 14:42:03 UTC (rev 6529)
@@ -219,6 +219,26 @@
public List<NodeData> buildNodeData(List<NodeData> list, NodeSPI node, boolean mapSafe)
{
+ if (usingMvcc)
+ return buildNodeData(list, node.getDelegationTarget(), mapSafe);
+ else
+ return buildNodeDataLegacy(list, node, mapSafe);
+ }
+
+ private List<NodeData> buildNodeData(List<NodeData> list, InternalNode<?, ?> node, boolean mapSafe)
+ {
+ NodeData data = new NodeData(buddyFqnTransformer.getActualFqn(node.getFqn()), node.getData(), mapSafe);
+ list.add(data);
+ for (InternalNode childNode : node.getChildrenMap().values())
+ {
+ buildNodeData(list, childNode, true);
+ }
+ return list;
+ }
+
+ @Deprecated
+ private List<NodeData> buildNodeDataLegacy(List<NodeData> list, NodeSPI node, boolean mapSafe)
+ {
NodeData data = new NodeData(buddyFqnTransformer.getActualFqn(node.getFqn()), node.getDataDirect(), mapSafe);
list.add(data);
for (Object childNode : node.getChildrenDirect())
Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-08-06 14:42:03 UTC (rev 6529)
@@ -132,8 +132,14 @@
}
if (configuration.isUsingBuddyReplication())
- interceptorChain.appendIntereceptor(createInterceptor(DataGravitatorInterceptor.class));
+ {
+ if (configuration.getNodeLockingScheme() == NodeLockingScheme.MVCC)
+ interceptorChain.appendIntereceptor(createInterceptor(DataGravitatorInterceptor.class));
+ else
+ interceptorChain.appendIntereceptor(createInterceptor(LegacyDataGravitatorInterceptor.class));
+ }
+
if (optimistic)
{
interceptorChain.appendIntereceptor(createInterceptor(OptimisticLockingInterceptor.class));
Deleted: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-08-06 14:42:03 UTC (rev 6529)
@@ -1,439 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.cache.interceptors;
-
-import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.DataContainer;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
-import org.jboss.cache.buddyreplication.BuddyManager;
-import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.cache.commands.DataCommand;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.read.ExistsCommand;
-import org.jboss.cache.commands.read.GetChildrenNamesCommand;
-import org.jboss.cache.commands.read.GetDataMapCommand;
-import org.jboss.cache.commands.read.GetKeyValueCommand;
-import org.jboss.cache.commands.read.GetKeysCommand;
-import org.jboss.cache.commands.read.GetNodeCommand;
-import org.jboss.cache.commands.read.GravitateDataCommand;
-import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
-import org.jboss.cache.commands.tx.CommitCommand;
-import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.invocation.InvocationContext;
-import org.jboss.cache.marshall.NodeData;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jgroups.Address;
-import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The Data Gravitator interceptor intercepts cache misses and attempts to
- * gravitate data from other parts of the cluster.
- * <p/>
- * Only used if Buddy Replication is enabled. Also, the interceptor only kicks
- * in if an {@link org.jboss.cache.config.Option} is passed in to force Data
- * Gravitation for a specific invocation or if <b>autoDataGravitation</b> is
- * set to <b>true</b> when configuring Buddy Replication.
- * <p/>
- * See the JBoss Cache User Guide for more details on configuration options.
- * There is a section dedicated to Buddy Replication in the Replication
- * chapter.
- * <p/>
- * In terms of functionality, if a gravitation call has occured and a cleanup call is needed (based on
- * how BR is configured), a cleanup call will be broadcast immediately after the gravitation call (no txs)
- * or if txs are used, an <i>asynchronous</i> call is made to perform the cleanup <i>outside</i> the scope
- * of the tx that caused the gravitation event.
- * <p/>
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- */
-public class DataGravitatorInterceptor extends BaseRpcInterceptor
-{
- private BuddyManager buddyManager;
- /**
- * Map that contains commands that need cleaning up. This is keyed on global transaction, and contains a list of
- * cleanup commands corresponding to all gravitate calls made during the course of the transaction in question.
- */
- private Map<GlobalTransaction, List<ReplicableCommand>> cleanupCommands = new ConcurrentHashMap<GlobalTransaction, List<ReplicableCommand>>();
- private DataContainer dataContainer;
- private CommandsFactory commandsFactory;
- private CacheSPI cacheSPI;
- private BuddyFqnTransformer buddyFqnTransformer;
-
- @Inject
- public void injectComponents(BuddyManager buddyManager, DataContainer dataContainer, CommandsFactory commandsFactory, CacheSPI cacheSPI, BuddyFqnTransformer transformer)
- {
- this.buddyManager = buddyManager;
- this.dataContainer = dataContainer;
- this.commandsFactory = commandsFactory;
- this.cacheSPI = cacheSPI;
- buddyFqnTransformer = transformer;
- }
-
- @Override
- public Object visitGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
- {
- return handleGetMethod(ctx, command);
- }
-
- @Override
- public Object visitGetDataMapCommand(InvocationContext ctx, GetDataMapCommand command) throws Throwable
- {
- return handleGetMethod(ctx, command);
- }
-
- @Override
- public Object visitExistsNodeCommand(InvocationContext ctx, ExistsCommand command) throws Throwable
- {
- return handleGetMethod(ctx, command);
- }
-
- @Override
- public Object visitGetKeysCommand(InvocationContext ctx, GetKeysCommand command) throws Throwable
- {
- return handleGetMethod(ctx, command);
- }
-
- @Override
- public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
- {
- return handleGetMethod(ctx, command);
- }
-
- @Override
- public Object visitGetNodeCommand(InvocationContext ctx, GetNodeCommand command) throws Throwable
- {
- return handleGetMethod(ctx, command);
- }
-
- @Override
- public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
- {
- try
- {
- return invokeNextInterceptor(ctx, command);
- }
- finally
- {
- cleanupCommands.remove(ctx.getGlobalTransaction());
- }
- }
-
- @Override
- public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
- {
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- try
- {
- doCommit(gtx);
- return invokeNextInterceptor(ctx, command);
- }
- finally
- {
- cleanupCommands.remove(gtx);
- }
- }
-
- private Object handleGetMethod(InvocationContext ctx, DataCommand command) throws Throwable
- {
- if (isGravitationEnabled(ctx))
- {
- // test that the Fqn being requested exists locally in the cache.
- if (trace) log.trace("Checking local existence of requested fqn " + command.getFqn());
- if (buddyFqnTransformer.isBackupFqn(command.getFqn()))
- {
- log.info("Is call for a backup Fqn, not performing any gravitation. Direct calls on internal backup nodes are *not* supported.");
- }
- else
- {
- if (!dataContainer.exists(command.getFqn()))
- {
- // gravitation is necessary.
-
- if (trace) log.trace("Gravitating from local backup tree");
- BackupData data = localBackupGet(command.getFqn(), ctx);
-
- if (data == null)
- {
- if (trace) log.trace("Gravitating from remote backup tree");
- // gravitate remotely.
- data = remoteBackupGet(command.getFqn());
- }
-
- if (data != null)
- {
- if (trace)
- log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established.");
- // store the gravitated node locally. This will cause it being backed up in the current instance's buddy.
- createNode(data.backupData);
-
- cleanBackupData(data, ctx);
- }
- }
- else
- {
- if (trace) log.trace("No need to gravitate; have this already.");
- }
- }
- }
- else
- {
- if (trace)
- {
- log.trace("Suppressing data gravitation for this call.");
- }
- }
- return invokeNextInterceptor(ctx, command);
- }
-
- private boolean isGravitationEnabled(InvocationContext ctx)
- {
- boolean enabled = ctx.isOriginLocal();
- if (enabled)
- {
- if (!buddyManager.isAutoDataGravitation())
- {
- enabled = ctx.getOptionOverrides().getForceDataGravitation();
- }
- }
- return enabled;
- }
-
- private void doCommit(GlobalTransaction gtx) throws Throwable
- {
- if (cleanupCommands.containsKey(gtx))
- {
- if (trace) log.trace("Broadcasting cleanup commands for gtx " + gtx);
-
- for (ReplicableCommand command : cleanupCommands.get(gtx))
- {
- try
- {
- doCleanup(command);
- }
- catch (Throwable th)
- {
- log.warn("Problems performing gravitation cleanup. Cleanup command: " + command, th);
- }
- }
- }
- else
- {
- if (trace) log.trace("No cleanups to broadcast in commit phase for gtx " + gtx);
- }
- }
-
- private BackupData remoteBackupGet(Fqn name) throws Exception
- {
- BackupData result = null;
- GravitateResult gr = gravitateData(name);
- if (gr.isDataFound())
- {
- if (trace)
- {
- log.trace("Got response " + gr);
- }
-
- result = new BackupData(name, gr);
- }
- return result;
- }
-
- private void cleanBackupData(BackupData backup, InvocationContext ctx) throws Throwable
- {
-
- DataGravitationCleanupCommand cleanupCommand = commandsFactory.buildDataGravitationCleanupCommand(backup.primaryFqn, backup.backupFqn);
- GlobalTransaction gtx = ctx.getGlobalTransaction();
-
- if (gtx == null)
- {
- // broadcast removes
- // remove main Fqn
- if (trace) log.trace("Performing cleanup on [" + backup.primaryFqn + "]");
- // remove backup Fqn
- doCleanup(cleanupCommand);
- }
- else
- {
- if (trace)
- log.trace("Data gravitation performed under global transaction " + gtx + ". Not broadcasting cleanups until the tx commits. Recording cleanup command for later use.");
- List<ReplicableCommand> commands;
- if (cleanupCommands.containsKey(gtx))
- {
- commands = cleanupCommands.get(gtx);
- }
- else
- {
- commands = new LinkedList<ReplicableCommand>();
- }
-
- commands.add(cleanupCommand);
- cleanupCommands.put(gtx, commands);
- }
- }
-
- private void doCleanup(ReplicableCommand cleanupCommand) throws Throwable
- {
- // cleanup commands are always ASYNCHRONOUS and is broadcast to *everyone* (even members of the current buddy
- // group as they may be members of > 1 buddy group)
- replicateCall(null, cleanupCommand, false, false, false, true, -1);
- }
-
- @SuppressWarnings("deprecation")
- private GravitateResult gravitateData(Fqn fqn) throws Exception
- {
- if (trace) log.trace("Requesting data gravitation for Fqn " + fqn);
-
- List<Address> mbrs = rpcManager.getMembers();
- Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE;
- GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn, searchSubtrees);
- // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
- // not have either the primary OR backup, and stop polling other valid nodes.
- List resps = rpcManager.callRemoteMethods(null, command, GroupRequest.GET_ALL, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, rpcManager.getLocalAddress()), false);
-
- if (trace) log.trace("got responses " + resps);
-
- if (resps == null)
- {
- if (mbrs.size() > 1) log.error("No replies to call " + command);
- return GravitateResult.noDataFound();
- }
-
- // test for and remove exceptions
- GravitateResult result = GravitateResult.noDataFound();
- for (Object o : resps)
- {
- if (o instanceof Throwable)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Found remote Throwable among responses - removing from responses list", (Exception) o);
- }
- }
- else if (o != null)
- {
- result = (GravitateResult) o;
- if (result.isDataFound())
- {
- break;
- }
- }
- else if (!configuration.isUseRegionBasedMarshalling())
- {
- // Null is OK if we are using region based marshalling; it
- // is what is returned if a region is inactive. Otherwise
- // getting a null is an error condition
- log.error("Unexpected null response to call " + command + ".");
- }
-
- }
-
- return result;
- }
-
- @SuppressWarnings("unchecked")
- private void createNode(List<NodeData> nodeData) throws CacheException
- {
- for (NodeData data : nodeData)
- {
- cacheSPI.put(data.getFqn(), data.getAttributes());
- }
- }
-
- private BackupData localBackupGet(Fqn fqn, InvocationContext ctx) throws CacheException
- {
- GravitateResult result = cacheSPI.gravitateData(fqn, true, ctx);// a "local" gravitation
- boolean found = result.isDataFound();
- BackupData data = null;
-
- if (found)
- {
- Fqn backupFqn = result.getBuddyBackupFqn();
- data = new BackupData(fqn, result);
- // now the cleanup
- if (buddyManager.isDataGravitationRemoveOnFind())
- {
- // Remove locally only; the remote call will
- // be broadcast later
- ctx.getOptionOverrides().setCacheModeLocal(true);
- cacheSPI.removeNode(backupFqn);
- }
- else
- {
- cacheSPI.evict(backupFqn, true);
- }
- }
-
- if (trace) log.trace("Retrieved data " + data + " found = " + found);
- return data;
- }
-
- private static class BackupData
- {
- Fqn primaryFqn;
- Fqn backupFqn;
- List<NodeData> backupData;
-
- public BackupData(Fqn primaryFqn, GravitateResult gr)
- {
- this.primaryFqn = primaryFqn;
- this.backupFqn = gr.getBuddyBackupFqn();
- this.backupData = gr.getNodeData();
- }
-
- public String toString()
- {
- return "BackupData{" +
- "primaryFqn=" + primaryFqn +
- ", backupFqn=" + backupFqn +
- ", backupData=" + backupData +
- '}';
- }
- }
-
- public static class ResponseValidityFilter implements RspFilter
- {
- private int numValidResponses = 0;
- private List<Address> pendingResponders;
-
- public ResponseValidityFilter(List<Address> expected, Address localAddress)
- {
- // so for now I used a list to keep it consistent
- this.pendingResponders = new ArrayList<Address>(expected);
- // We'll never get a response from ourself
- this.pendingResponders.remove(localAddress);
- }
-
- public boolean isAcceptable(Object object, Address address)
- {
- pendingResponders.remove(address);
-
- if (object instanceof GravitateResult)
- {
- GravitateResult response = (GravitateResult) object;
- if (response.isDataFound()) numValidResponses++;
- }
- // always return true to make sure a response is logged by the JGroups RpcDispatcher.
- return true;
- }
-
- public boolean needMoreResponses()
- {
- return numValidResponses < 1 && pendingResponders.size() > 0;
- }
- }
-}
Added: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-08-06 14:42:03 UTC (rev 6529)
@@ -0,0 +1,29 @@
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.mvcc.MVCCNodeHelper;
+
+/**
+ * MVCC specific version of the LegacyDataGravitatorInterceptor
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class DataGravitatorInterceptor extends LegacyDataGravitatorInterceptor
+{
+ MVCCNodeHelper helper;
+
+ @Inject
+ public void injectMvccNodeHelper(MVCCNodeHelper helper)
+ {
+ this.helper = helper;
+ }
+
+ @Override
+ protected void wrapIfNeeded(InvocationContext ctx, Fqn fqnToWrap) throws InterruptedException
+ {
+ helper.wrapNodeForReading(ctx, fqnToWrap);
+ }
+}
Copied: core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java (from rev 6527, core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java 2008-08-06 14:42:03 UTC (rev 6529)
@@ -0,0 +1,446 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
+import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.buddyreplication.GravitateResult;
+import org.jboss.cache.commands.CommandsFactory;
+import org.jboss.cache.commands.DataCommand;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.read.ExistsCommand;
+import org.jboss.cache.commands.read.GetChildrenNamesCommand;
+import org.jboss.cache.commands.read.GetDataMapCommand;
+import org.jboss.cache.commands.read.GetKeyValueCommand;
+import org.jboss.cache.commands.read.GetKeysCommand;
+import org.jboss.cache.commands.read.GetNodeCommand;
+import org.jboss.cache.commands.read.GravitateDataCommand;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.RollbackCommand;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.marshall.NodeData;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jgroups.Address;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The Data Gravitator interceptor intercepts cache misses and attempts to
+ * gravitate data from other parts of the cluster.
+ * <p/>
+ * Only used if Buddy Replication is enabled. Also, the interceptor only kicks
+ * in if an {@link org.jboss.cache.config.Option} is passed in to force Data
+ * Gravitation for a specific invocation or if <b>autoDataGravitation</b> is
+ * set to <b>true</b> when configuring Buddy Replication.
+ * <p/>
+ * See the JBoss Cache User Guide for more details on configuration options.
+ * There is a section dedicated to Buddy Replication in the Replication
+ * chapter.
+ * <p/>
+ * In terms of functionality, if a gravitation call has occured and a cleanup call is needed (based on
+ * how BR is configured), a cleanup call will be broadcast immediately after the gravitation call (no txs)
+ * or if txs are used, an <i>asynchronous</i> call is made to perform the cleanup <i>outside</i> the scope
+ * of the tx that caused the gravitation event.
+ * <p/>
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ * @deprecated will be removed with optimistic and pessimistic locking.
+ */
+ at Deprecated
+public class LegacyDataGravitatorInterceptor extends BaseRpcInterceptor
+{
+ private BuddyManager buddyManager;
+ /**
+ * Map that contains commands that need cleaning up. This is keyed on global transaction, and contains a list of
+ * cleanup commands corresponding to all gravitate calls made during the course of the transaction in question.
+ */
+ private Map<GlobalTransaction, List<ReplicableCommand>> cleanupCommands = new ConcurrentHashMap<GlobalTransaction, List<ReplicableCommand>>();
+ private DataContainer dataContainer;
+ private CommandsFactory commandsFactory;
+ private CacheSPI cacheSPI;
+ private BuddyFqnTransformer buddyFqnTransformer;
+
+ @Inject
+ public void injectComponents(BuddyManager buddyManager, DataContainer dataContainer, CommandsFactory commandsFactory, CacheSPI cacheSPI, BuddyFqnTransformer transformer)
+ {
+ this.buddyManager = buddyManager;
+ this.dataContainer = dataContainer;
+ this.commandsFactory = commandsFactory;
+ this.cacheSPI = cacheSPI;
+ buddyFqnTransformer = transformer;
+ }
+
+ @Override
+ public Object visitGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
+ {
+ return handleGetMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitGetDataMapCommand(InvocationContext ctx, GetDataMapCommand command) throws Throwable
+ {
+ return handleGetMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitExistsNodeCommand(InvocationContext ctx, ExistsCommand command) throws Throwable
+ {
+ return handleGetMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitGetKeysCommand(InvocationContext ctx, GetKeysCommand command) throws Throwable
+ {
+ return handleGetMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
+ {
+ return handleGetMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitGetNodeCommand(InvocationContext ctx, GetNodeCommand command) throws Throwable
+ {
+ return handleGetMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+ {
+ try
+ {
+ return invokeNextInterceptor(ctx, command);
+ }
+ finally
+ {
+ cleanupCommands.remove(ctx.getGlobalTransaction());
+ }
+ }
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ try
+ {
+ doCommit(gtx);
+ return invokeNextInterceptor(ctx, command);
+ }
+ finally
+ {
+ cleanupCommands.remove(gtx);
+ }
+ }
+
+ private Object handleGetMethod(InvocationContext ctx, DataCommand command) throws Throwable
+ {
+ if (isGravitationEnabled(ctx))
+ {
+ // test that the Fqn being requested exists locally in the cache.
+ if (trace) log.trace("Checking local existence of requested fqn " + command.getFqn());
+ if (buddyFqnTransformer.isBackupFqn(command.getFqn()))
+ {
+ log.info("Is call for a backup Fqn, not performing any gravitation. Direct calls on internal backup nodes are *not* supported.");
+ }
+ else
+ {
+ if (!dataContainer.exists(command.getFqn()))
+ {
+ // gravitation is necessary.
+
+ if (trace) log.trace("Gravitating from local backup tree");
+ BackupData data = localBackupGet(command.getFqn(), ctx);
+
+ if (data == null)
+ {
+ if (trace) log.trace("Gravitating from remote backup tree");
+ // gravitate remotely.
+ data = remoteBackupGet(command.getFqn());
+ }
+
+ if (data != null)
+ {
+ if (trace)
+ log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established.");
+ // store the gravitated node locally. This will cause it being backed up in the current instance's buddy.
+ createNode(data.backupData);
+ cleanBackupData(data, ctx);
+ wrapIfNeeded(ctx, data.primaryFqn);
+ }
+ }
+ else
+ {
+ if (trace) log.trace("No need to gravitate; have this already.");
+ }
+ }
+ }
+ else
+ {
+ if (trace)
+ {
+ log.trace("Suppressing data gravitation for this call.");
+ }
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ protected void wrapIfNeeded(InvocationContext ctx, Fqn fqnToWrap) throws InterruptedException
+ {
+ // no op
+ }
+
+ private boolean isGravitationEnabled(InvocationContext ctx)
+ {
+ boolean enabled = ctx.isOriginLocal();
+ if (enabled)
+ {
+ if (!buddyManager.isAutoDataGravitation())
+ {
+ enabled = ctx.getOptionOverrides().getForceDataGravitation();
+ }
+ }
+ return enabled;
+ }
+
+ private void doCommit(GlobalTransaction gtx) throws Throwable
+ {
+ if (cleanupCommands.containsKey(gtx))
+ {
+ if (trace) log.trace("Broadcasting cleanup commands for gtx " + gtx);
+
+ for (ReplicableCommand command : cleanupCommands.get(gtx))
+ {
+ try
+ {
+ doCleanup(command);
+ }
+ catch (Throwable th)
+ {
+ log.warn("Problems performing gravitation cleanup. Cleanup command: " + command, th);
+ }
+ }
+ }
+ else
+ {
+ if (trace) log.trace("No cleanups to broadcast in commit phase for gtx " + gtx);
+ }
+ }
+
+ private BackupData remoteBackupGet(Fqn name) throws Exception
+ {
+ BackupData result = null;
+ GravitateResult gr = gravitateData(name);
+ if (gr.isDataFound())
+ {
+ if (trace)
+ {
+ log.trace("Got response " + gr);
+ }
+
+ result = new BackupData(name, gr);
+ }
+ return result;
+ }
+
+ private void cleanBackupData(BackupData backup, InvocationContext ctx) throws Throwable
+ {
+
+ DataGravitationCleanupCommand cleanupCommand = commandsFactory.buildDataGravitationCleanupCommand(backup.primaryFqn, backup.backupFqn);
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+
+ if (gtx == null)
+ {
+ // broadcast removes
+ // remove main Fqn
+ if (trace) log.trace("Performing cleanup on [" + backup.primaryFqn + "]");
+ // remove backup Fqn
+ doCleanup(cleanupCommand);
+ }
+ else
+ {
+ if (trace)
+ log.trace("Data gravitation performed under global transaction " + gtx + ". Not broadcasting cleanups until the tx commits. Recording cleanup command for later use.");
+ List<ReplicableCommand> commands;
+ if (cleanupCommands.containsKey(gtx))
+ {
+ commands = cleanupCommands.get(gtx);
+ }
+ else
+ {
+ commands = new LinkedList<ReplicableCommand>();
+ }
+
+ commands.add(cleanupCommand);
+ cleanupCommands.put(gtx, commands);
+ }
+ }
+
+ private void doCleanup(ReplicableCommand cleanupCommand) throws Throwable
+ {
+ // cleanup commands are always ASYNCHRONOUS and is broadcast to *everyone* (even members of the current buddy
+ // group as they may be members of > 1 buddy group)
+ replicateCall(null, cleanupCommand, false, false, false, true, -1);
+ }
+
+ @SuppressWarnings("deprecation")
+ private GravitateResult gravitateData(Fqn fqn) throws Exception
+ {
+ if (trace) log.trace("Requesting data gravitation for Fqn " + fqn);
+
+ List<Address> mbrs = rpcManager.getMembers();
+ Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE;
+ GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn, searchSubtrees);
+ // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
+ // not have either the primary OR backup, and stop polling other valid nodes.
+ List resps = rpcManager.callRemoteMethods(null, command, GroupRequest.GET_ALL, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, rpcManager.getLocalAddress()), false);
+
+ if (trace) log.trace("got responses " + resps);
+
+ if (resps == null)
+ {
+ if (mbrs.size() > 1) log.error("No replies to call " + command);
+ return GravitateResult.noDataFound();
+ }
+
+ // test for and remove exceptions
+ GravitateResult result = GravitateResult.noDataFound();
+ for (Object o : resps)
+ {
+ if (o instanceof Throwable)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug("Found remote Throwable among responses - removing from responses list", (Exception) o);
+ }
+ }
+ else if (o != null)
+ {
+ result = (GravitateResult) o;
+ if (result.isDataFound())
+ {
+ break;
+ }
+ }
+ else if (!configuration.isUseRegionBasedMarshalling())
+ {
+ // Null is OK if we are using region based marshalling; it
+ // is what is returned if a region is inactive. Otherwise
+ // getting a null is an error condition
+ log.error("Unexpected null response to call " + command + ".");
+ }
+
+ }
+
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void createNode(List<NodeData> nodeData) throws CacheException
+ {
+ for (NodeData data : nodeData)
+ {
+ cacheSPI.put(data.getFqn(), data.getAttributes());
+ }
+ }
+
+ private BackupData localBackupGet(Fqn fqn, InvocationContext ctx) throws CacheException
+ {
+ GravitateResult result = cacheSPI.gravitateData(fqn, true, ctx);// a "local" gravitation
+ boolean found = result.isDataFound();
+ BackupData data = null;
+
+ if (found)
+ {
+ Fqn backupFqn = result.getBuddyBackupFqn();
+ data = new BackupData(fqn, result);
+ // now the cleanup
+ if (buddyManager.isDataGravitationRemoveOnFind())
+ {
+ // Remove locally only; the remote call will
+ // be broadcast later
+ ctx.getOptionOverrides().setCacheModeLocal(true);
+ cacheSPI.removeNode(backupFqn);
+ }
+ else
+ {
+ cacheSPI.evict(backupFqn, true);
+ }
+ }
+
+ if (trace) log.trace("Retrieved data " + data + " found = " + found);
+ return data;
+ }
+
+ private static class BackupData
+ {
+ Fqn primaryFqn;
+ Fqn backupFqn;
+ List<NodeData> backupData;
+
+ public BackupData(Fqn primaryFqn, GravitateResult gr)
+ {
+ this.primaryFqn = primaryFqn;
+ this.backupFqn = gr.getBuddyBackupFqn();
+ this.backupData = gr.getNodeData();
+ }
+
+ public String toString()
+ {
+ return "BackupData{" +
+ "primaryFqn=" + primaryFqn +
+ ", backupFqn=" + backupFqn +
+ ", backupData=" + backupData +
+ '}';
+ }
+ }
+
+ public static class ResponseValidityFilter implements RspFilter
+ {
+ private int numValidResponses = 0;
+ private List<Address> pendingResponders;
+
+ public ResponseValidityFilter(List<Address> expected, Address localAddress)
+ {
+ // so for now I used a list to keep it consistent
+ this.pendingResponders = new ArrayList<Address>(expected);
+ // We'll never get a response from ourself
+ this.pendingResponders.remove(localAddress);
+ }
+
+ public boolean isAcceptable(Object object, Address address)
+ {
+ pendingResponders.remove(address);
+
+ if (object instanceof GravitateResult)
+ {
+ GravitateResult response = (GravitateResult) object;
+ if (response.isDataFound()) numValidResponses++;
+ }
+ // always return true to make sure a response is logged by the JGroups RpcDispatcher.
+ return true;
+ }
+
+ public boolean needMoreResponses()
+ {
+ return numValidResponses < 1 && pendingResponders.size() > 0;
+ }
+ }
+}
Property changes on: core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java 2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java 2008-08-06 14:42:03 UTC (rev 6529)
@@ -14,7 +14,7 @@
import org.jboss.cache.config.parsing.XmlConfigHelper;
import org.jboss.cache.config.parsing.XmlConfigurationParser;
import org.jboss.cache.config.parsing.element.BuddyElementParser;
-import org.jboss.cache.interceptors.DataGravitatorInterceptor;
+import org.jboss.cache.interceptors.LegacyDataGravitatorInterceptor;
import org.jboss.cache.interceptors.base.CommandInterceptor;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
@@ -102,7 +102,7 @@
boolean hasDG = false;
for (CommandInterceptor interceptor : cache.getInterceptorChain())
{
- hasDG = hasDG || (interceptor instanceof DataGravitatorInterceptor);
+ hasDG = hasDG || (interceptor instanceof LegacyDataGravitatorInterceptor);
}
System.out.println(cache.getInterceptorChain());
Modified: core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java 2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java 2008-08-06 14:42:03 UTC (rev 6529)
@@ -503,7 +503,7 @@
assertEquals(OptimisticTxInterceptor.class, interceptors.next().getClass());
assertEquals(NotificationInterceptor.class, interceptors.next().getClass());
assertEquals(OptimisticReplicationInterceptor.class, interceptors.next().getClass());
- assertEquals(DataGravitatorInterceptor.class, interceptors.next().getClass());
+ assertEquals(LegacyDataGravitatorInterceptor.class, interceptors.next().getClass());
assertEquals(OptimisticLockingInterceptor.class, interceptors.next().getClass());
assertEquals(OptimisticValidatorInterceptor.class, interceptors.next().getClass());
assertEquals(OptimisticCreateIfNotExistsInterceptor.class, interceptors.next().getClass());
@@ -534,7 +534,7 @@
assertEquals(NotificationInterceptor.class, interceptors.next().getClass());
assertEquals(ReplicationInterceptor.class, interceptors.next().getClass());
assertEquals(PessimisticLockInterceptor.class, interceptors.next().getClass());
- assertEquals(DataGravitatorInterceptor.class, interceptors.next().getClass());
+ assertEquals(LegacyDataGravitatorInterceptor.class, interceptors.next().getClass());
assertEquals(CallInterceptor.class, interceptors.next().getClass());
assertInterceptorLinkage(list);
More information about the jbosscache-commits
mailing list