Author: manik.surtani(a)jboss.com
Date: 2009-02-27 07:38:16 -0500 (Fri, 27 Feb 2009)
New Revision: 7802
Added:
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java
Log:
Tweaks to NBST code
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-27 10:26:02 UTC
(rev 7801)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-27 12:38:16 UTC
(rev 7802)
@@ -63,6 +63,7 @@
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
import org.jgroups.protocols.TP;
+import org.jgroups.protocols.pbcast.FLUSH;
import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER;
import org.jgroups.stack.ProtocolStack;
import org.jgroups.util.Rsp;
@@ -379,6 +380,15 @@
}
}
+ private void sanityCheckJGroupsStack(JChannel channel)
+ {
+ if (channel.getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) ==
null)
+ throw new ConfigurationException("JGroups channel does not use
STREAMING_STATE_TRANSFER! This is a requirement for non-blocking state transfer. Either
make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking
state transfer.");
+
+ if (channel.getProtocolStack().findProtocol(FLUSH.class) == null)
+ throw new ConfigurationException("JGroups channel does not use FLUSH! This
is a requirement for non-blocking state transfer. Either make sure your JGroups
configuration uses FLUSH or disable non-blocking state transfer.");
+ }
+
private void sanityCheckConfiguration(boolean nonBlockingStateTransfer, boolean
fetchStateOnStart)
{
if (isInLocalMode || !nonBlockingStateTransfer || !fetchStateOnStart) return; //
don't care about these cases!
@@ -392,16 +402,17 @@
private void startNonBlockStateTransfer(List<Address> members)
{
-
if (members.size() < 2)
{
+ if (log.isInfoEnabled()) log.info("Not retrieving state since cluster size
is " + members.size());
return;
}
-
boolean success = false;
+ int numRetries = 2; // should be 5
+ int initwait = 100; // should be 1000
outer:
- for (int i = 0, wait = 1000; i < 5; i++)
+ for (int i = 0, wait = initwait; i < numRetries; i++)
{
for (Address member : members)
{
@@ -428,11 +439,12 @@
if (!success)
{
- if (log.isWarnEnabled()) log.warn("Could not find available peer for
state, backing off and retrying");
+ wait <<= 2;
+ if (log.isWarnEnabled()) log.warn("Could not find available peer for
state, backing off and retrying after "+wait+" millis. Retries left: " +
(numRetries -1 -i) );
try
{
- Thread.sleep(wait <<= 2);
+ Thread.sleep(wait);
}
catch (InterruptedException e)
{
@@ -542,10 +554,8 @@
configuration.getRuntimeConfig().setChannel(channel);
}
+ if (nbst) sanityCheckJGroupsStack((JChannel) channel);
- if (nbst && ((JChannel)
channel).getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) == null)
- throw new ConfigurationException("JGroups channel does not use
STREAMING_STATE_TRANSFER! This is a requirement for non-blocking state transfer. Either
make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking
state transfer.");
-
// Channel.LOCAL *must* be set to false so we don't see our own messages -
otherwise invalidations targeted at
// remote instances will be received by self.
channel.setOpt(Channel.LOCAL, false);
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-27
10:26:02 UTC (rev 7801)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-27
12:38:16 UTC (rev 7802)
@@ -23,7 +23,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InternalNode;
@@ -97,7 +96,7 @@
{
activated = txLog.activate();
if (! activated)
- throw new CacheException("Busy performing state transfer for
someone else");
+ throw new StateProviderBusyException("Busy performing state
transfer for someone else");
if (trace) log.trace("Transaction log activated!");
}
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-27
10:26:02 UTC (rev 7801)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-27
12:38:16 UTC (rev 7802)
@@ -21,16 +21,6 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
@@ -61,6 +51,16 @@
import org.jgroups.Address;
import org.jgroups.Channel;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
@@ -120,6 +120,49 @@
integrateTxLog(ois);
}
+ private void doPartialFlush(Channel channel, List<Address> members)
+ {
+ int retries = 5;
+ int sleepBetweenRetries = 250;
+ int sleepIncreaseFactor = 2;
+ if (trace) log.trace("Attempting a partial flush on members " + members +
" with up to " + retries + " retries.");
+
+ boolean success = false;
+ int i;
+ for (i=1; i<=retries; i++)
+ {
+ if (trace) log.trace("Attempt number " + i);
+ try
+ {
+ if (success = channel.startFlush(members, false)) break;
+ if (trace) log.trace("Channel.startFlush() returned false!");
+ }
+ catch (Exception e)
+ {
+ if (trace) log.trace("Caught exception attempting a partial flush",
e);
+ }
+ try
+ {
+ if (trace) log.trace("Partial state transfer failed. Backing off for
" + sleepBetweenRetries + " millis and retrying");
+ Thread.sleep(sleepBetweenRetries);
+ sleepBetweenRetries *= sleepIncreaseFactor;
+ }
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (success)
+ {
+ if (log.isDebugEnabled()) log.debug("Partial flush between " + members
+ " succeeded!");
+ }
+ else
+ {
+ throw new CacheException("Could initiate partial flush between "
+members+ "! State-transfer failed!");
+ }
+ }
+
private void integrateTxLog(ObjectInputStream ois) throws Exception
{
if (trace)
@@ -129,16 +172,9 @@
Channel channel = manager.getChannel();
- List<Address> targets = new ArrayList<Address>(2);
- targets.add(channel.getLocalAddress());
- targets.add(manager.getLastStateTransferSource());
+ List<Address> targets = Arrays.asList(channel.getLocalAddress(),
manager.getLastStateTransferSource());
+ doPartialFlush(channel, targets);
- if (trace)
- log.trace("Flushing targets: " + targets);
-
- if (!channel.startFlush(targets, false))
- throw new CacheException("Could not flush channel! State-transfer
failed!");
-
try
{
if (trace)
@@ -169,8 +205,7 @@
}
finally
{
- if (trace)
- log.trace("Stopping flush");
+ if (trace) log.trace("Stopping partial flush");
channel.stopFlush(targets);
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-02-27
10:26:02 UTC (rev 7801)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-02-27
12:38:16 UTC (rev 7802)
@@ -103,6 +103,7 @@
if (log.isDebugEnabled())
log.debug("Generating in-memory (transient) state for subtree " +
fqn);
+ // this method will throw a StateProviderBusyException if the state provider is
busy providing state to someone else.
generator.generateState(out, subtreeRoot, fetchTransientState,
fetchPersistentState, suppressErrors);
if (log.isDebugEnabled())
Added:
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java 2009-02-27
12:38:16 UTC (rev 7802)
@@ -0,0 +1,29 @@
+package org.jboss.cache.statetransfer;
+
+/**
+ * Thrown when a state provider is busy
+ *
+ * @author Manik Surtani
+ * @since 3.1
+ */
+public class StateProviderBusyException extends Exception
+{
+ public StateProviderBusyException()
+ {
+ }
+
+ public StateProviderBusyException(String message)
+ {
+ super(message);
+ }
+
+ public StateProviderBusyException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public StateProviderBusyException(Throwable cause)
+ {
+ super(cause);
+ }
+}
Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java 2009-02-27
10:26:02 UTC (rev 7801)
+++
core/trunk/src/test/java/org/jboss/cache/statetransfer/NBSTCacheLoaderTest.java 2009-02-27
12:38:16 UTC (rev 7802)
@@ -10,7 +10,7 @@
import java.io.IOException;
-@Test(groups = "functional", testName =
"statetransfer.NBSTCacheLoaderTest", enabled = false)
+@Test(groups = "functional", testName =
"statetransfer.NBSTCacheLoaderTest")
public class NBSTCacheLoaderTest extends NonBlockingStateTransferTest
{
int id;