[jbosscache-commits] JBoss Cache SVN: r6529 - in core/trunk/src: main/java/org/jboss/cache/factories and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Aug 6 10:42:03 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-08-06 10:42:03 -0400 (Wed, 06 Aug 2008)
New Revision: 6529

Added:
   core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
Removed:
   core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
   core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java
   core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
Log:
Fixed broken data gravitation with MVCC

Modified: core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java	2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java	2008-08-06 14:42:03 UTC (rev 6529)
@@ -219,6 +219,26 @@
 
    public List<NodeData> buildNodeData(List<NodeData> list, NodeSPI node, boolean mapSafe)
    {
+      if (usingMvcc)
+         return buildNodeData(list, node.getDelegationTarget(), mapSafe);
+      else
+         return buildNodeDataLegacy(list, node, mapSafe);
+   }
+
+   private List<NodeData> buildNodeData(List<NodeData> list, InternalNode<?, ?> node, boolean mapSafe)
+   {
+      NodeData data = new NodeData(buddyFqnTransformer.getActualFqn(node.getFqn()), node.getData(), mapSafe);
+      list.add(data);
+      for (InternalNode childNode : node.getChildrenMap().values())
+      {
+         buildNodeData(list, childNode, true);
+      }
+      return list;
+   }
+
+   @Deprecated
+   private List<NodeData> buildNodeDataLegacy(List<NodeData> list, NodeSPI node, boolean mapSafe)
+   {
       NodeData data = new NodeData(buddyFqnTransformer.getActualFqn(node.getFqn()), node.getDataDirect(), mapSafe);
       list.add(data);
       for (Object childNode : node.getChildrenDirect())

Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-08-06 14:42:03 UTC (rev 6529)
@@ -132,8 +132,14 @@
       }
 
       if (configuration.isUsingBuddyReplication())
-         interceptorChain.appendIntereceptor(createInterceptor(DataGravitatorInterceptor.class));
+      {
+         if (configuration.getNodeLockingScheme() == NodeLockingScheme.MVCC)
+            interceptorChain.appendIntereceptor(createInterceptor(DataGravitatorInterceptor.class));
+         else
+            interceptorChain.appendIntereceptor(createInterceptor(LegacyDataGravitatorInterceptor.class));
+      }
 
+
       if (optimistic)
       {
          interceptorChain.appendIntereceptor(createInterceptor(OptimisticLockingInterceptor.class));

Deleted: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-08-06 14:42:03 UTC (rev 6529)
@@ -1,439 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-package org.jboss.cache.interceptors;
-
-import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.DataContainer;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
-import org.jboss.cache.buddyreplication.BuddyManager;
-import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.cache.commands.DataCommand;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.read.ExistsCommand;
-import org.jboss.cache.commands.read.GetChildrenNamesCommand;
-import org.jboss.cache.commands.read.GetDataMapCommand;
-import org.jboss.cache.commands.read.GetKeyValueCommand;
-import org.jboss.cache.commands.read.GetKeysCommand;
-import org.jboss.cache.commands.read.GetNodeCommand;
-import org.jboss.cache.commands.read.GravitateDataCommand;
-import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
-import org.jboss.cache.commands.tx.CommitCommand;
-import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.invocation.InvocationContext;
-import org.jboss.cache.marshall.NodeData;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jgroups.Address;
-import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The Data Gravitator interceptor intercepts cache misses and attempts to
- * gravitate data from other parts of the cluster.
- * <p/>
- * Only used if Buddy Replication is enabled.  Also, the interceptor only kicks
- * in if an {@link org.jboss.cache.config.Option} is passed in to force Data
- * Gravitation for a specific invocation or if <b>autoDataGravitation</b> is
- * set to <b>true</b> when configuring Buddy Replication.
- * <p/>
- * See the JBoss Cache User Guide for more details on configuration options.
- * There is a section dedicated to Buddy Replication in the Replication
- * chapter.
- * <p/>
- * In terms of functionality, if a gravitation call has occured and a cleanup call is needed (based on
- * how BR is configured), a cleanup call will be broadcast immediately after the gravitation call (no txs)
- * or if txs are used, an <i>asynchronous</i> call is made to perform the cleanup <i>outside</i> the scope
- * of the tx that caused the gravitation event.
- * <p/>
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- */
-public class DataGravitatorInterceptor extends BaseRpcInterceptor
-{
-   private BuddyManager buddyManager;
-   /**
-    * Map that contains commands that need cleaning up.  This is keyed on global transaction, and contains a list of
-    * cleanup commands corresponding to all gravitate calls made during the course of the transaction in question.
-    */
-   private Map<GlobalTransaction, List<ReplicableCommand>> cleanupCommands = new ConcurrentHashMap<GlobalTransaction, List<ReplicableCommand>>();
-   private DataContainer dataContainer;
-   private CommandsFactory commandsFactory;
-   private CacheSPI cacheSPI;
-   private BuddyFqnTransformer buddyFqnTransformer;
-
-   @Inject
-   public void injectComponents(BuddyManager buddyManager, DataContainer dataContainer, CommandsFactory commandsFactory, CacheSPI cacheSPI, BuddyFqnTransformer transformer)
-   {
-      this.buddyManager = buddyManager;
-      this.dataContainer = dataContainer;
-      this.commandsFactory = commandsFactory;
-      this.cacheSPI = cacheSPI;
-      buddyFqnTransformer = transformer;
-   }
-
-   @Override
-   public Object visitGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
-   {
-      return handleGetMethod(ctx, command);
-   }
-
-   @Override
-   public Object visitGetDataMapCommand(InvocationContext ctx, GetDataMapCommand command) throws Throwable
-   {
-      return handleGetMethod(ctx, command);
-   }
-
-   @Override
-   public Object visitExistsNodeCommand(InvocationContext ctx, ExistsCommand command) throws Throwable
-   {
-      return handleGetMethod(ctx, command);
-   }
-
-   @Override
-   public Object visitGetKeysCommand(InvocationContext ctx, GetKeysCommand command) throws Throwable
-   {
-      return handleGetMethod(ctx, command);
-   }
-
-   @Override
-   public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
-   {
-      return handleGetMethod(ctx, command);
-   }
-
-   @Override
-   public Object visitGetNodeCommand(InvocationContext ctx, GetNodeCommand command) throws Throwable
-   {
-      return handleGetMethod(ctx, command);
-   }
-
-   @Override
-   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
-   {
-      try
-      {
-         return invokeNextInterceptor(ctx, command);
-      }
-      finally
-      {
-         cleanupCommands.remove(ctx.getGlobalTransaction());
-      }
-   }
-
-   @Override
-   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
-   {
-      GlobalTransaction gtx = ctx.getGlobalTransaction();
-      try
-      {
-         doCommit(gtx);
-         return invokeNextInterceptor(ctx, command);
-      }
-      finally
-      {
-         cleanupCommands.remove(gtx);
-      }
-   }
-
-   private Object handleGetMethod(InvocationContext ctx, DataCommand command) throws Throwable
-   {
-      if (isGravitationEnabled(ctx))
-      {
-         // test that the Fqn being requested exists locally in the cache.
-         if (trace) log.trace("Checking local existence of requested fqn " + command.getFqn());
-         if (buddyFqnTransformer.isBackupFqn(command.getFqn()))
-         {
-            log.info("Is call for a backup Fqn, not performing any gravitation.  Direct calls on internal backup nodes are *not* supported.");
-         }
-         else
-         {
-            if (!dataContainer.exists(command.getFqn()))
-            {
-               // gravitation is necessary.
-
-               if (trace) log.trace("Gravitating from local backup tree");
-               BackupData data = localBackupGet(command.getFqn(), ctx);
-
-               if (data == null)
-               {
-                  if (trace) log.trace("Gravitating from remote backup tree");
-                  // gravitate remotely.
-                  data = remoteBackupGet(command.getFqn());
-               }
-
-               if (data != null)
-               {
-                  if (trace)
-                     log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established.");
-                  // store the gravitated node locally.  This will cause it being backed up in the current instance's buddy.
-                  createNode(data.backupData);
-
-                  cleanBackupData(data, ctx);
-               }
-            }
-            else
-            {
-               if (trace) log.trace("No need to gravitate; have this already.");
-            }
-         }
-      }
-      else
-      {
-         if (trace)
-         {
-            log.trace("Suppressing data gravitation for this call.");
-         }
-      }
-      return invokeNextInterceptor(ctx, command);
-   }
-
-   private boolean isGravitationEnabled(InvocationContext ctx)
-   {
-      boolean enabled = ctx.isOriginLocal();
-      if (enabled)
-      {
-         if (!buddyManager.isAutoDataGravitation())
-         {
-            enabled = ctx.getOptionOverrides().getForceDataGravitation();
-         }
-      }
-      return enabled;
-   }
-
-   private void doCommit(GlobalTransaction gtx) throws Throwable
-   {
-      if (cleanupCommands.containsKey(gtx))
-      {
-         if (trace) log.trace("Broadcasting cleanup commands for gtx " + gtx);
-
-         for (ReplicableCommand command : cleanupCommands.get(gtx))
-         {
-            try
-            {
-               doCleanup(command);
-            }
-            catch (Throwable th)
-            {
-               log.warn("Problems performing gravitation cleanup.  Cleanup command: " + command, th);
-            }
-         }
-      }
-      else
-      {
-         if (trace) log.trace("No cleanups to broadcast in commit phase for gtx " + gtx);
-      }
-   }
-
-   private BackupData remoteBackupGet(Fqn name) throws Exception
-   {
-      BackupData result = null;
-      GravitateResult gr = gravitateData(name);
-      if (gr.isDataFound())
-      {
-         if (trace)
-         {
-            log.trace("Got response " + gr);
-         }
-
-         result = new BackupData(name, gr);
-      }
-      return result;
-   }
-
-   private void cleanBackupData(BackupData backup, InvocationContext ctx) throws Throwable
-   {
-
-      DataGravitationCleanupCommand cleanupCommand = commandsFactory.buildDataGravitationCleanupCommand(backup.primaryFqn, backup.backupFqn);
-      GlobalTransaction gtx = ctx.getGlobalTransaction();
-
-      if (gtx == null)
-      {
-         // broadcast removes
-         // remove main Fqn
-         if (trace) log.trace("Performing cleanup on [" + backup.primaryFqn + "]");
-         // remove backup Fqn
-         doCleanup(cleanupCommand);
-      }
-      else
-      {
-         if (trace)
-            log.trace("Data gravitation performed under global transaction " + gtx + ".  Not broadcasting cleanups until the tx commits.  Recording cleanup command for later use.");
-         List<ReplicableCommand> commands;
-         if (cleanupCommands.containsKey(gtx))
-         {
-            commands = cleanupCommands.get(gtx);
-         }
-         else
-         {
-            commands = new LinkedList<ReplicableCommand>();
-         }
-
-         commands.add(cleanupCommand);
-         cleanupCommands.put(gtx, commands);
-      }
-   }
-
-   private void doCleanup(ReplicableCommand cleanupCommand) throws Throwable
-   {
-      // cleanup commands are always ASYNCHRONOUS and is broadcast to *everyone* (even members of the current buddy
-      // group as they may be members of > 1 buddy group)
-      replicateCall(null, cleanupCommand, false, false, false, true, -1);
-   }
-
-   @SuppressWarnings("deprecation")
-   private GravitateResult gravitateData(Fqn fqn) throws Exception
-   {
-      if (trace) log.trace("Requesting data gravitation for Fqn " + fqn);
-
-      List<Address> mbrs = rpcManager.getMembers();
-      Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE;
-      GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn, searchSubtrees);
-      // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
-      // not have either the primary OR backup, and stop polling other valid nodes.
-      List resps = rpcManager.callRemoteMethods(null, command, GroupRequest.GET_ALL, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, rpcManager.getLocalAddress()), false);
-
-      if (trace) log.trace("got responses " + resps);
-
-      if (resps == null)
-      {
-         if (mbrs.size() > 1) log.error("No replies to call " + command);
-         return GravitateResult.noDataFound();
-      }
-
-      // test for and remove exceptions
-      GravitateResult result = GravitateResult.noDataFound();
-      for (Object o : resps)
-      {
-         if (o instanceof Throwable)
-         {
-            if (log.isDebugEnabled())
-            {
-               log.debug("Found remote Throwable among responses - removing from responses list", (Exception) o);
-            }
-         }
-         else if (o != null)
-         {
-            result = (GravitateResult) o;
-            if (result.isDataFound())
-            {
-               break;
-            }
-         }
-         else if (!configuration.isUseRegionBasedMarshalling())
-         {
-            // Null is OK if we are using region based marshalling; it
-            // is what is returned if a region is inactive. Otherwise
-            // getting a null is an error condition
-            log.error("Unexpected null response to call " + command + ".");
-         }
-
-      }
-
-      return result;
-   }
-
-   @SuppressWarnings("unchecked")
-   private void createNode(List<NodeData> nodeData) throws CacheException
-   {
-      for (NodeData data : nodeData)
-      {
-         cacheSPI.put(data.getFqn(), data.getAttributes());
-      }
-   }
-
-   private BackupData localBackupGet(Fqn fqn, InvocationContext ctx) throws CacheException
-   {
-      GravitateResult result = cacheSPI.gravitateData(fqn, true, ctx);// a "local" gravitation
-      boolean found = result.isDataFound();
-      BackupData data = null;
-
-      if (found)
-      {
-         Fqn backupFqn = result.getBuddyBackupFqn();
-         data = new BackupData(fqn, result);
-         // now the cleanup
-         if (buddyManager.isDataGravitationRemoveOnFind())
-         {
-            // Remove locally only; the remote call will
-            // be broadcast later
-            ctx.getOptionOverrides().setCacheModeLocal(true);
-            cacheSPI.removeNode(backupFqn);
-         }
-         else
-         {
-            cacheSPI.evict(backupFqn, true);
-         }
-      }
-
-      if (trace) log.trace("Retrieved data " + data + " found = " + found);
-      return data;
-   }
-
-   private static class BackupData
-   {
-      Fqn primaryFqn;
-      Fqn backupFqn;
-      List<NodeData> backupData;
-
-      public BackupData(Fqn primaryFqn, GravitateResult gr)
-      {
-         this.primaryFqn = primaryFqn;
-         this.backupFqn = gr.getBuddyBackupFqn();
-         this.backupData = gr.getNodeData();
-      }
-
-      public String toString()
-      {
-         return "BackupData{" +
-               "primaryFqn=" + primaryFqn +
-               ", backupFqn=" + backupFqn +
-               ", backupData=" + backupData +
-               '}';
-      }
-   }
-
-   public static class ResponseValidityFilter implements RspFilter
-   {
-      private int numValidResponses = 0;
-      private List<Address> pendingResponders;
-
-      public ResponseValidityFilter(List<Address> expected, Address localAddress)
-      {
-         // so for now I used a list to keep it consistent
-         this.pendingResponders = new ArrayList<Address>(expected);
-         // We'll never get a response from ourself
-         this.pendingResponders.remove(localAddress);
-      }
-
-      public boolean isAcceptable(Object object, Address address)
-      {
-         pendingResponders.remove(address);
-
-         if (object instanceof GravitateResult)
-         {
-            GravitateResult response = (GravitateResult) object;
-            if (response.isDataFound()) numValidResponses++;
-         }
-         // always return true to make sure a response is logged by the JGroups RpcDispatcher.
-         return true;
-      }
-
-      public boolean needMoreResponses()
-      {
-         return numValidResponses < 1 && pendingResponders.size() > 0;
-      }
-   }
-}

Added: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-08-06 14:42:03 UTC (rev 6529)
@@ -0,0 +1,29 @@
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.Fqn;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.mvcc.MVCCNodeHelper;
+
+/**
+ * MVCC specific version of the LegacyDataGravitatorInterceptor
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class DataGravitatorInterceptor extends LegacyDataGravitatorInterceptor
+{
+   MVCCNodeHelper helper;
+
+   @Inject
+   public void injectMvccNodeHelper(MVCCNodeHelper helper)
+   {
+      this.helper = helper;
+   }
+
+   @Override
+   protected void wrapIfNeeded(InvocationContext ctx, Fqn fqnToWrap) throws InterruptedException
+   {
+      helper.wrapNodeForReading(ctx, fqnToWrap);
+   }
+}

Copied: core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java (from rev 6527, core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java	2008-08-06 14:42:03 UTC (rev 6529)
@@ -0,0 +1,446 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DataContainer;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
+import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.buddyreplication.GravitateResult;
+import org.jboss.cache.commands.CommandsFactory;
+import org.jboss.cache.commands.DataCommand;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.read.ExistsCommand;
+import org.jboss.cache.commands.read.GetChildrenNamesCommand;
+import org.jboss.cache.commands.read.GetDataMapCommand;
+import org.jboss.cache.commands.read.GetKeyValueCommand;
+import org.jboss.cache.commands.read.GetKeysCommand;
+import org.jboss.cache.commands.read.GetNodeCommand;
+import org.jboss.cache.commands.read.GravitateDataCommand;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.RollbackCommand;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.marshall.NodeData;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jgroups.Address;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The Data Gravitator interceptor intercepts cache misses and attempts to
+ * gravitate data from other parts of the cluster.
+ * <p/>
+ * Only used if Buddy Replication is enabled.  Also, the interceptor only kicks
+ * in if an {@link org.jboss.cache.config.Option} is passed in to force Data
+ * Gravitation for a specific invocation or if <b>autoDataGravitation</b> is
+ * set to <b>true</b> when configuring Buddy Replication.
+ * <p/>
+ * See the JBoss Cache User Guide for more details on configuration options.
+ * There is a section dedicated to Buddy Replication in the Replication
+ * chapter.
+ * <p/>
+ * In terms of functionality, if a gravitation call has occured and a cleanup call is needed (based on
+ * how BR is configured), a cleanup call will be broadcast immediately after the gravitation call (no txs)
+ * or if txs are used, an <i>asynchronous</i> call is made to perform the cleanup <i>outside</i> the scope
+ * of the tx that caused the gravitation event.
+ * <p/>
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ * @deprecated will be removed with optimistic and pessimistic locking.
+ */
+ at Deprecated
+public class LegacyDataGravitatorInterceptor extends BaseRpcInterceptor
+{
+   private BuddyManager buddyManager;
+   /**
+    * Map that contains commands that need cleaning up.  This is keyed on global transaction, and contains a list of
+    * cleanup commands corresponding to all gravitate calls made during the course of the transaction in question.
+    */
+   private Map<GlobalTransaction, List<ReplicableCommand>> cleanupCommands = new ConcurrentHashMap<GlobalTransaction, List<ReplicableCommand>>();
+   private DataContainer dataContainer;
+   private CommandsFactory commandsFactory;
+   private CacheSPI cacheSPI;
+   private BuddyFqnTransformer buddyFqnTransformer;
+
+   @Inject
+   public void injectComponents(BuddyManager buddyManager, DataContainer dataContainer, CommandsFactory commandsFactory, CacheSPI cacheSPI, BuddyFqnTransformer transformer)
+   {
+      this.buddyManager = buddyManager;
+      this.dataContainer = dataContainer;
+      this.commandsFactory = commandsFactory;
+      this.cacheSPI = cacheSPI;
+      buddyFqnTransformer = transformer;
+   }
+
+   @Override
+   public Object visitGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
+   {
+      return handleGetMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitGetDataMapCommand(InvocationContext ctx, GetDataMapCommand command) throws Throwable
+   {
+      return handleGetMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitExistsNodeCommand(InvocationContext ctx, ExistsCommand command) throws Throwable
+   {
+      return handleGetMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitGetKeysCommand(InvocationContext ctx, GetKeysCommand command) throws Throwable
+   {
+      return handleGetMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
+   {
+      return handleGetMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitGetNodeCommand(InvocationContext ctx, GetNodeCommand command) throws Throwable
+   {
+      return handleGetMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+   {
+      try
+      {
+         return invokeNextInterceptor(ctx, command);
+      }
+      finally
+      {
+         cleanupCommands.remove(ctx.getGlobalTransaction());
+      }
+   }
+
+   @Override
+   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   {
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+      try
+      {
+         doCommit(gtx);
+         return invokeNextInterceptor(ctx, command);
+      }
+      finally
+      {
+         cleanupCommands.remove(gtx);
+      }
+   }
+
+   private Object handleGetMethod(InvocationContext ctx, DataCommand command) throws Throwable
+   {
+      if (isGravitationEnabled(ctx))
+      {
+         // test that the Fqn being requested exists locally in the cache.
+         if (trace) log.trace("Checking local existence of requested fqn " + command.getFqn());
+         if (buddyFqnTransformer.isBackupFqn(command.getFqn()))
+         {
+            log.info("Is call for a backup Fqn, not performing any gravitation.  Direct calls on internal backup nodes are *not* supported.");
+         }
+         else
+         {
+            if (!dataContainer.exists(command.getFqn()))
+            {
+               // gravitation is necessary.
+
+               if (trace) log.trace("Gravitating from local backup tree");
+               BackupData data = localBackupGet(command.getFqn(), ctx);
+
+               if (data == null)
+               {
+                  if (trace) log.trace("Gravitating from remote backup tree");
+                  // gravitate remotely.
+                  data = remoteBackupGet(command.getFqn());
+               }
+
+               if (data != null)
+               {
+                  if (trace)
+                     log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established.");
+                  // store the gravitated node locally.  This will cause it being backed up in the current instance's buddy.
+                  createNode(data.backupData);
+                  cleanBackupData(data, ctx);
+                  wrapIfNeeded(ctx, data.primaryFqn);
+               }
+            }
+            else
+            {
+               if (trace) log.trace("No need to gravitate; have this already.");
+            }
+         }
+      }
+      else
+      {
+         if (trace)
+         {
+            log.trace("Suppressing data gravitation for this call.");
+         }
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   protected void wrapIfNeeded(InvocationContext ctx, Fqn fqnToWrap) throws InterruptedException
+   {
+      // no op
+   }
+
+   private boolean isGravitationEnabled(InvocationContext ctx)
+   {
+      boolean enabled = ctx.isOriginLocal();
+      if (enabled)
+      {
+         if (!buddyManager.isAutoDataGravitation())
+         {
+            enabled = ctx.getOptionOverrides().getForceDataGravitation();
+         }
+      }
+      return enabled;
+   }
+
+   private void doCommit(GlobalTransaction gtx) throws Throwable
+   {
+      if (cleanupCommands.containsKey(gtx))
+      {
+         if (trace) log.trace("Broadcasting cleanup commands for gtx " + gtx);
+
+         for (ReplicableCommand command : cleanupCommands.get(gtx))
+         {
+            try
+            {
+               doCleanup(command);
+            }
+            catch (Throwable th)
+            {
+               log.warn("Problems performing gravitation cleanup.  Cleanup command: " + command, th);
+            }
+         }
+      }
+      else
+      {
+         if (trace) log.trace("No cleanups to broadcast in commit phase for gtx " + gtx);
+      }
+   }
+
+   private BackupData remoteBackupGet(Fqn name) throws Exception
+   {
+      BackupData result = null;
+      GravitateResult gr = gravitateData(name);
+      if (gr.isDataFound())
+      {
+         if (trace)
+         {
+            log.trace("Got response " + gr);
+         }
+
+         result = new BackupData(name, gr);
+      }
+      return result;
+   }
+
+   private void cleanBackupData(BackupData backup, InvocationContext ctx) throws Throwable
+   {
+
+      DataGravitationCleanupCommand cleanupCommand = commandsFactory.buildDataGravitationCleanupCommand(backup.primaryFqn, backup.backupFqn);
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+
+      if (gtx == null)
+      {
+         // broadcast removes
+         // remove main Fqn
+         if (trace) log.trace("Performing cleanup on [" + backup.primaryFqn + "]");
+         // remove backup Fqn
+         doCleanup(cleanupCommand);
+      }
+      else
+      {
+         if (trace)
+            log.trace("Data gravitation performed under global transaction " + gtx + ".  Not broadcasting cleanups until the tx commits.  Recording cleanup command for later use.");
+         List<ReplicableCommand> commands;
+         if (cleanupCommands.containsKey(gtx))
+         {
+            commands = cleanupCommands.get(gtx);
+         }
+         else
+         {
+            commands = new LinkedList<ReplicableCommand>();
+         }
+
+         commands.add(cleanupCommand);
+         cleanupCommands.put(gtx, commands);
+      }
+   }
+
+   private void doCleanup(ReplicableCommand cleanupCommand) throws Throwable
+   {
+      // cleanup commands are always ASYNCHRONOUS and is broadcast to *everyone* (even members of the current buddy
+      // group as they may be members of > 1 buddy group)
+      replicateCall(null, cleanupCommand, false, false, false, true, -1);
+   }
+
+   @SuppressWarnings("deprecation")
+   private GravitateResult gravitateData(Fqn fqn) throws Exception
+   {
+      if (trace) log.trace("Requesting data gravitation for Fqn " + fqn);
+
+      List<Address> mbrs = rpcManager.getMembers();
+      Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE;
+      GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn, searchSubtrees);
+      // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
+      // not have either the primary OR backup, and stop polling other valid nodes.
+      List resps = rpcManager.callRemoteMethods(null, command, GroupRequest.GET_ALL, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, rpcManager.getLocalAddress()), false);
+
+      if (trace) log.trace("got responses " + resps);
+
+      if (resps == null)
+      {
+         if (mbrs.size() > 1) log.error("No replies to call " + command);
+         return GravitateResult.noDataFound();
+      }
+
+      // test for and remove exceptions
+      GravitateResult result = GravitateResult.noDataFound();
+      for (Object o : resps)
+      {
+         if (o instanceof Throwable)
+         {
+            if (log.isDebugEnabled())
+            {
+               log.debug("Found remote Throwable among responses - removing from responses list", (Exception) o);
+            }
+         }
+         else if (o != null)
+         {
+            result = (GravitateResult) o;
+            if (result.isDataFound())
+            {
+               break;
+            }
+         }
+         else if (!configuration.isUseRegionBasedMarshalling())
+         {
+            // Null is OK if we are using region based marshalling; it
+            // is what is returned if a region is inactive. Otherwise
+            // getting a null is an error condition
+            log.error("Unexpected null response to call " + command + ".");
+         }
+
+      }
+
+      return result;
+   }
+
+   @SuppressWarnings("unchecked")
+   private void createNode(List<NodeData> nodeData) throws CacheException
+   {
+      for (NodeData data : nodeData)
+      {
+         cacheSPI.put(data.getFqn(), data.getAttributes());
+      }
+   }
+
+   private BackupData localBackupGet(Fqn fqn, InvocationContext ctx) throws CacheException
+   {
+      GravitateResult result = cacheSPI.gravitateData(fqn, true, ctx);// a "local" gravitation
+      boolean found = result.isDataFound();
+      BackupData data = null;
+
+      if (found)
+      {
+         Fqn backupFqn = result.getBuddyBackupFqn();
+         data = new BackupData(fqn, result);
+         // now the cleanup
+         if (buddyManager.isDataGravitationRemoveOnFind())
+         {
+            // Remove locally only; the remote call will
+            // be broadcast later
+            ctx.getOptionOverrides().setCacheModeLocal(true);
+            cacheSPI.removeNode(backupFqn);
+         }
+         else
+         {
+            cacheSPI.evict(backupFqn, true);
+         }
+      }
+
+      if (trace) log.trace("Retrieved data " + data + " found = " + found);
+      return data;
+   }
+
+   private static class BackupData
+   {
+      Fqn primaryFqn;
+      Fqn backupFqn;
+      List<NodeData> backupData;
+
+      public BackupData(Fqn primaryFqn, GravitateResult gr)
+      {
+         this.primaryFqn = primaryFqn;
+         this.backupFqn = gr.getBuddyBackupFqn();
+         this.backupData = gr.getNodeData();
+      }
+
+      public String toString()
+      {
+         return "BackupData{" +
+               "primaryFqn=" + primaryFqn +
+               ", backupFqn=" + backupFqn +
+               ", backupData=" + backupData +
+               '}';
+      }
+   }
+
+   public static class ResponseValidityFilter implements RspFilter
+   {
+      private int numValidResponses = 0;
+      private List<Address> pendingResponders;
+
+      public ResponseValidityFilter(List<Address> expected, Address localAddress)
+      {
+         // so for now I used a list to keep it consistent
+         this.pendingResponders = new ArrayList<Address>(expected);
+         // We'll never get a response from ourself
+         this.pendingResponders.remove(localAddress);
+      }
+
+      public boolean isAcceptable(Object object, Address address)
+      {
+         pendingResponders.remove(address);
+
+         if (object instanceof GravitateResult)
+         {
+            GravitateResult response = (GravitateResult) object;
+            if (response.isDataFound()) numValidResponses++;
+         }
+         // always return true to make sure a response is logged by the JGroups RpcDispatcher.
+         return true;
+      }
+
+      public boolean needMoreResponses()
+      {
+         return numValidResponses < 1 && pendingResponders.size() > 0;
+      }
+   }
+}


Property changes on: core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
___________________________________________________________________
Name: svn:keywords
   + Author Date Id Revision
Name: svn:eol-style
   + native

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java	2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationConfigTest.java	2008-08-06 14:42:03 UTC (rev 6529)
@@ -14,7 +14,7 @@
 import org.jboss.cache.config.parsing.XmlConfigHelper;
 import org.jboss.cache.config.parsing.XmlConfigurationParser;
 import org.jboss.cache.config.parsing.element.BuddyElementParser;
-import org.jboss.cache.interceptors.DataGravitatorInterceptor;
+import org.jboss.cache.interceptors.LegacyDataGravitatorInterceptor;
 import org.jboss.cache.interceptors.base.CommandInterceptor;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
@@ -102,7 +102,7 @@
       boolean hasDG = false;
       for (CommandInterceptor interceptor : cache.getInterceptorChain())
       {
-         hasDG = hasDG || (interceptor instanceof DataGravitatorInterceptor);
+         hasDG = hasDG || (interceptor instanceof LegacyDataGravitatorInterceptor);
       }
 
       System.out.println(cache.getInterceptorChain());

Modified: core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java	2008-08-06 14:37:50 UTC (rev 6528)
+++ core/trunk/src/test/java/org/jboss/cache/factories/InterceptorChainFactoryTest.java	2008-08-06 14:42:03 UTC (rev 6529)
@@ -503,7 +503,7 @@
       assertEquals(OptimisticTxInterceptor.class, interceptors.next().getClass());
       assertEquals(NotificationInterceptor.class, interceptors.next().getClass());
       assertEquals(OptimisticReplicationInterceptor.class, interceptors.next().getClass());
-      assertEquals(DataGravitatorInterceptor.class, interceptors.next().getClass());
+      assertEquals(LegacyDataGravitatorInterceptor.class, interceptors.next().getClass());
       assertEquals(OptimisticLockingInterceptor.class, interceptors.next().getClass());
       assertEquals(OptimisticValidatorInterceptor.class, interceptors.next().getClass());
       assertEquals(OptimisticCreateIfNotExistsInterceptor.class, interceptors.next().getClass());
@@ -534,7 +534,7 @@
       assertEquals(NotificationInterceptor.class, interceptors.next().getClass());
       assertEquals(ReplicationInterceptor.class, interceptors.next().getClass());
       assertEquals(PessimisticLockInterceptor.class, interceptors.next().getClass());
-      assertEquals(DataGravitatorInterceptor.class, interceptors.next().getClass());
+      assertEquals(LegacyDataGravitatorInterceptor.class, interceptors.next().getClass());
       assertEquals(CallInterceptor.class, interceptors.next().getClass());
 
       assertInterceptorLinkage(list);




More information about the jbosscache-commits mailing list