[jbosscache-commits] JBoss Cache SVN: r7802 - in core/trunk/src: main/java/org/jboss/cache/statetransfer and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Feb 27 07:38:16 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-02-27 07:38:16 -0500 (Fri, 27 Feb 2009)
New Revision: 7802

Added:
   core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
   core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java
Log:
Tweaks to NBST code

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-27 10:26:02 UTC (rev 7801)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-27 12:38:16 UTC (rev 7802)
@@ -63,6 +63,7 @@
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RspFilter;
 import org.jgroups.protocols.TP;
+import org.jgroups.protocols.pbcast.FLUSH;
 import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER;
 import org.jgroups.stack.ProtocolStack;
 import org.jgroups.util.Rsp;
@@ -379,6 +380,15 @@
       }
    }
 
+   private void sanityCheckJGroupsStack(JChannel channel)
+   {
+      if (channel.getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) == null)
+         throw new ConfigurationException("JGroups channel does not use STREAMING_STATE_TRANSFER!  This is a requirement for non-blocking state transfer.  Either make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking state transfer.");
+
+      if (channel.getProtocolStack().findProtocol(FLUSH.class) == null)
+         throw new ConfigurationException("JGroups channel does not use FLUSH!  This is a requirement for non-blocking state transfer.  Either make sure your JGroups configuration uses FLUSH or disable non-blocking state transfer.");
+   }
+
    private void sanityCheckConfiguration(boolean nonBlockingStateTransfer, boolean fetchStateOnStart)
    {
       if (isInLocalMode || !nonBlockingStateTransfer || !fetchStateOnStart) return; // don't care about these cases!
@@ -392,16 +402,17 @@
    
    private void startNonBlockStateTransfer(List<Address> members)
    {
-
       if (members.size() < 2)
       {
+         if (log.isInfoEnabled()) log.info("Not retrieving state since cluster size is " + members.size());
          return;
       }
-
       boolean success = false;
+      int numRetries = 2; // should be 5
+      int initwait = 100; // should be 1000
 
       outer:
-      for (int i = 0, wait = 1000; i < 5; i++)
+      for (int i = 0, wait = initwait; i < numRetries; i++)
       {
          for (Address member : members)
          {
@@ -428,11 +439,12 @@
 
          if (!success)
          {
-            if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying");
+            wait <<= 2;
+            if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying after "+wait+" millis.  Retries left: " + (numRetries -1 -i) );
 
             try
             {
-               Thread.sleep(wait <<= 2);
+               Thread.sleep(wait);
             }
             catch (InterruptedException e)
             {
@@ -542,10 +554,8 @@
          configuration.getRuntimeConfig().setChannel(channel);
       }
 
+      if (nbst) sanityCheckJGroupsStack((JChannel) channel);
 
-      if (nbst && ((JChannel) channel).getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) == null)
-            throw new ConfigurationException("JGroups channel does not use STREAMING_STATE_TRANSFER!  This is a requirement for non-blocking state transfer.  Either make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking state transfer.");
-
       // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
       // remote instances will be received by self.
       channel.setOpt(Channel.LOCAL, false);

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2009-02-27 10:26:02 UTC (rev 7801)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2009-02-27 12:38:16 UTC (rev 7802)
@@ -23,7 +23,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InternalNode;
@@ -97,7 +96,7 @@
             {
                activated = txLog.activate();
                if (! activated)
-                  throw new CacheException("Busy performing state transfer for someone else");
+                  throw new StateProviderBusyException("Busy performing state transfer for someone else");
 
                if (trace) log.trace("Transaction log activated!");
             }

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-02-27 10:26:02 UTC (rev 7801)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-02-27 12:38:16 UTC (rev 7802)
@@ -21,16 +21,6 @@
  */
 package org.jboss.cache.statetransfer;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.CacheException;
@@ -61,6 +51,16 @@
 import org.jgroups.Address;
 import org.jgroups.Channel;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class DefaultStateTransferIntegrator implements StateTransferIntegrator
 {
 
@@ -120,6 +120,49 @@
          integrateTxLog(ois);
    }
 
+   private void doPartialFlush(Channel channel, List<Address> members)
+   {
+      int retries = 5;
+      int sleepBetweenRetries = 250;
+      int sleepIncreaseFactor = 2;
+      if (trace) log.trace("Attempting a partial flush on members " + members + " with up to " + retries + " retries.");
+
+      boolean success = false;
+      int i;
+      for (i=1; i<=retries; i++)
+      {
+         if (trace) log.trace("Attempt number " + i);
+         try
+         {
+            if (success = channel.startFlush(members, false)) break;
+            if (trace) log.trace("Channel.startFlush() returned false!");
+         }
+         catch (Exception e)
+         {
+            if (trace) log.trace("Caught exception attempting a partial flush", e);
+         }
+         try
+         {
+            if (trace) log.trace("Partial state transfer failed.  Backing off for " + sleepBetweenRetries + " millis and retrying");
+            Thread.sleep(sleepBetweenRetries);
+            sleepBetweenRetries *= sleepIncreaseFactor;
+         }
+         catch (InterruptedException ie)
+         {
+            Thread.currentThread().interrupt();
+         }
+      }
+
+      if (success)
+      {
+         if (log.isDebugEnabled()) log.debug("Partial flush between " + members + " succeeded!");
+      }
+      else
+      {
+         throw new CacheException("Could initiate partial flush between " +members+ "! State-transfer failed!");
+      }
+   }
+
    private void integrateTxLog(ObjectInputStream ois) throws Exception
    {
       if (trace)
@@ -129,16 +172,9 @@
 
       Channel channel = manager.getChannel();
 
-      List<Address> targets = new ArrayList<Address>(2);
-      targets.add(channel.getLocalAddress());
-      targets.add(manager.getLastStateTransferSource());
+      List<Address> targets = Arrays.asList(channel.getLocalAddress(), manager.getLastStateTransferSource());
+      doPartialFlush(channel, targets);            
 
-      if (trace)
-         log.trace("Flushing targets: " + targets);
-
-      if (!channel.startFlush(targets, false))
-         throw new CacheException("Could not flush channel! State-transfer failed!");
-
       try
       {
          if (trace)
@@ -169,8 +205,7 @@
       }
       finally
       {
-         if (trace)
-            log.trace("Stopping flush");
+         if (trace) log.trace("Stopping partial flush");
          channel.stopFlush(targets);
       }
    }

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2009-02-27 10:26:02 UTC (rev 7801)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2009-02-27 12:38:16 UTC (rev 7802)
@@ -103,6 +103,7 @@
          if (log.isDebugEnabled())
             log.debug("Generating in-memory (transient) state for subtree " + fqn);
 
+         // this method will throw a StateProviderBusyException if the state provider is busy providing state to someone else.
          generator.generateState(out, subtreeRoot, fetchTransientState, fetchPersistentState, suppressErrors);
 
          if (log.isDebugEnabled())

Added: core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java	2009-02-27 12:38:16 UTC (rev 7802)
@@ -0,0 +1,29 @@
+package org.jboss.cache.statetransfer;
+
+/**
+ * Thrown when a state provider is busy
+ *
+ * @author Manik Surtani
+ * @since 3.1
+ */
+public class StateProviderBusyException extends Exception
+{
+   public StateProviderBusyException()
+   {
+   }
+
+   public StateProviderBusyException(String message)
+   {
+      super(message);
+   }
+
+   public StateProviderBusyException(String message, Throwable cause)
+   {
+      super(message, cause);
+   }
+
+   public StateProviderBusyException(Throwable cause)
+   {
+      super(cause);
+   }
+}

Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java	2009-02-27 10:26:02 UTC (rev 7801)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java	2009-02-27 12:38:16 UTC (rev 7802)
@@ -10,7 +10,7 @@
 
 import java.io.IOException;
 
- at Test(groups = "functional", testName = "statetransfer.NBSTCacheLoaderTest", enabled = false)
+ at Test(groups = "functional", testName = "statetransfer.NBSTCacheLoaderTest")
 public class NBSTCacheLoaderTest extends NonBlockingStateTransferTest
 {
    int id;




More information about the jbosscache-commits mailing list