[jboss-cvs] JBossCache/tests/functional/org/jboss/cache/statetransfer ...
Vladmir Blagojevic
vladimir.blagojevic at jboss.com
Sat Jul 21 13:00:42 EDT 2007
User: vblagojevic
Date: 07/07/21 13:00:42
Modified: tests/functional/org/jboss/cache/statetransfer
StateTransferTestBase.java
StateTransfer200Test.java
Added: tests/functional/org/jboss/cache/statetransfer
StateTransferConcurrencyTest.java
Removed: tests/functional/org/jboss/cache/statetransfer
VersionedTestBase.java
Log:
[JBCACHE-983] - State transfer test failures ( to do with concurrent eviction and activation )
Revision Changes Path
1.40 +21 -1 JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransferTestBase.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransferTestBase.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransferTestBase.java,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -b -r1.39 -r1.40
--- StateTransferTestBase.java 14 Jun 2007 15:30:14 -0000 1.39
+++ StateTransferTestBase.java 21 Jul 2007 17:00:42 -0000 1.40
@@ -26,6 +26,7 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.Region;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
@@ -49,9 +50,21 @@
*/
public abstract class StateTransferTestBase extends AbstractCacheLoaderTestBase
{
+
+
+ protected static final int SUBTREE_SIZE = 10;
+
+ public static final Fqn A = Fqn.fromString("/a");
+ public static final Fqn B = Fqn.fromString("/b");
+ public static final Fqn C = Fqn.fromString("/c");
+
+ protected static final String ADDRESS_CLASSNAME = "org.jboss.cache.marshall.data.Address";
+ protected static final String PERSON_CLASSNAME = "org.jboss.cache.marshall.data.Person";
+
public static final Fqn A_B = Fqn.fromString("/a/b");
public static final Fqn A_C = Fqn.fromString("/a/c");
public static final Fqn A_D = Fqn.fromString("/a/d");
+
public static final String JOE = "JOE";
public static final String BOB = "BOB";
public static final String JANE = "JANE";
@@ -140,6 +153,13 @@
return tree;
}
+ protected void createAndActivateRegion(CacheSPI c, Fqn f)
+ {
+ Region r = c.getRegion(f, true);
+ r.registerContextClassLoader(getClass().getClassLoader());
+ r.activate();
+ }
+
/**
* Provides a hook for multiplexer integration. This default implementation
* is a no-op; subclasses that test mux integration would override
1.21 +308 -2 JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransfer200Test.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: StateTransfer200Test.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransfer200Test.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -b -r1.20 -r1.21
--- StateTransfer200Test.java 18 Jul 2007 20:35:00 -0000 1.20
+++ StateTransfer200Test.java 21 Jul 2007 17:00:42 -0000 1.21
@@ -7,12 +7,16 @@
package org.jboss.cache.statetransfer;
+import java.lang.reflect.Method;
+
import org.jboss.cache.CacheImpl;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.Region;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.BuddyReplicationConfig;
import org.jboss.cache.factories.XmlConfigurationParser;
+import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.misc.TestingUtil;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -24,9 +28,9 @@
* Tests that state transfer works properly if the version is 2.0.0.GA.
*
* @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
- * @version $Revision: 1.20 $
+ * @version $Revision: 1.21 $
*/
-public class StateTransfer200Test extends VersionedTestBase
+public class StateTransfer200Test extends StateTransferTestBase
{
protected String getReplicationVersion()
@@ -114,6 +118,308 @@
assertFalse("/a/b is not in cache loader ", cache2.getCacheLoaderManager().getCacheLoader().exists(A_B));
}
+ public void testLoadEntireStateAfterStart() throws Exception
+ {
+ CacheSPI cache1 = createCache("cache1", false, true, true);
+
+ createAndActivateRegion(cache1, Fqn.ROOT);
+
+ cache1.put(A_B, "name", JOE);
+ cache1.put(A_B, "age", TWENTY);
+ cache1.put(A_C, "name", BOB);
+ cache1.put(A_C, "age", FORTY);
+
+ CacheSPI cache2 = createCache("cache2", false, true, true);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+ CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
+
+ assertNull("/a/b transferred to loader against policy", loader.get(A_B));
+
+ assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
+ assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
+ assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
+ assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
+
+ createAndActivateRegion(cache2, Fqn.ROOT);
+
+ assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
+ assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
+ assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
+ assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
+ }
+
+
+ public void testInitialStateTransfer() throws Exception
+ {
+ CacheSPI cache1 = createCache("cache1", false, false, false);
+
+ cache1.put(A_B, "name", JOE);
+ cache1.put(A_B, "age", TWENTY);
+ cache1.put(A_C, "name", BOB);
+ cache1.put(A_C, "age", FORTY);
+
+ CacheSPI cache2 = createCache("cache2", false, false, false);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
+ }
+
+ public void testInitialStateTferWithLoader() throws Exception
+ {
+ initialStateTferWithLoaderTest(false);
+ }
+
+ public void testInitialStateTferWithAsyncLoader() throws Exception
+ {
+ initialStateTferWithLoaderTest(true);
+ }
+
+ protected void initialStateTferWithLoaderTest(boolean asyncLoader) throws Exception
+ {
+ initialStateTferWithLoaderTest("org.jboss.cache.loader.FileCacheLoader",
+ "org.jboss.cache.loader.FileCacheLoader", asyncLoader);
+ }
+
+ public void testPartialStateTransfer() throws Exception
+ {
+ CacheSPI cache1 = createCache("cache1", false, true, false);
+
+ createAndActivateRegion(cache1, A);
+
+ cache1.put(A_B, "name", JOE);
+ cache1.put(A_B, "age", TWENTY);
+ cache1.put(A_C, "name", BOB);
+ cache1.put(A_C, "age", FORTY);
+
+ CacheSPI cache2 = createCache("cache2", false, true, false);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+ assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
+ assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
+ assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
+ assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
+
+ createAndActivateRegion(cache2, A_B);
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
+ assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
+
+ cache1.put(A_D, "name", JANE);
+
+ assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
+
+ createAndActivateRegion(cache2, A_C);
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
+ assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
+
+ createAndActivateRegion(cache2, A_D);
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
+ assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
+
+
+ cache1.getRegion(A, false).deactivate();
+ createAndActivateRegion(cache1, A_B);
+ createAndActivateRegion(cache1, A_C);
+ createAndActivateRegion(cache1, A_D);
+
+ assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
+ assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
+
+ }
+
+ public void testPartialStateTferWithLoader() throws Exception
+ {
+ CacheSPI cache1 = createCache("cache1", false, true, true);
+
+ createAndActivateRegion(cache1, A);
+
+ cache1.put(A_B, "name", JOE);
+ cache1.put(A_B, "age", TWENTY);
+ cache1.put(A_C, "name", BOB);
+ cache1.put(A_C, "age", FORTY);
+
+ CacheSPI cache2 = createCache("cache2", false, true, true);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+ CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
+
+ assertNull("/a/b transferred to loader against policy", loader.get(A_B));
+
+ assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
+ assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
+ assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
+ assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
+
+ createAndActivateRegion(cache2, A_B);
+
+ assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
+ assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
+ assertNull("/a/c transferred to loader against policy", loader.get(A_C));
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
+ assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
+
+ cache1.put(A_D, "name", JANE);
+
+ assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
+
+ createAndActivateRegion(cache2, A_C);
+
+ assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
+ assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
+ assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
+ assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
+ assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
+
+ createAndActivateRegion(cache2, A_D);
+
+ assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
+ assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
+ assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
+ assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
+ assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
+
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
+ assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
+
+ cache1.getRegion(A, false).deactivate();
+
+ createAndActivateRegion(cache1, A_B);
+ createAndActivateRegion(cache1, A_C);
+ createAndActivateRegion(cache1, A_D);
+
+ loader = cache1.getCacheLoaderManager().getCacheLoader();
+
+ assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
+ assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
+ assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
+ assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
+ assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
+
+ assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
+ assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
+ }
+
+ public void testPartialStateTferWithClassLoader() throws Exception
+ {
+ // FIXME: This test is meaningless because MarshalledValueInputStream
+ // will find the classes w/ their own loader if TCL can't. Need
+ // to find a way to test!
+ // But, at least it tests JBCACHE-305 by registering a classloader
+ // both before and after start()
+
+ // Set the TCL to a classloader that can't see Person/Address
+ Thread.currentThread().setContextClassLoader(getNotFoundClassLoader());
+
+ CacheSPI cache1 = createCache("cache1",
+ false, // async
+ true, // use marshaller
+ true, // use cacheloader
+ false, false);// don't start
+ ClassLoader cl1 = getClassLoader();
+ cache1.getRegion(A, true).registerContextClassLoader(cl1);
+ startCache(cache1);
+
+ cache1.getRegion(A, true).activate();
+
+ Object ben = createBen(cl1);
+
+ cache1.put(A_B, "person", ben);
+
+ // For cache 2 we won't register loader until later
+ CacheSPI cache2 = createCache("cache2",
+ false, // async
+ true, // use marshalling
+ true, // use cacheloader
+ false, true);// start
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+ CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
+
+ assertNull("/a/b not transferred to loader", loader.get(A_B));
+
+ assertNull("/a/b not transferred to cache", cache2.get(A_B, "person"));
+
+ ClassLoader cl2 = getClassLoader();
+
+ // cache2.registerClassLoader(A, cl2);
+ Region r = cache2.getRegion(A, true);
+ r.registerContextClassLoader(cl2);
+
+ r.activate();
+
+ assertEquals("Correct state from loader for /a/b", ben.toString(), loader.get(A_B).get("person").toString());
+
+ assertEquals("Correct state from cache for /a/b", ben.toString(), cache2.get(A_B, "person").toString());
+
+ }
+
+ private Object createBen(ClassLoader loader) throws Exception
+ {
+ Class addrClazz = loader.loadClass(ADDRESS_CLASSNAME);
+ Method setCity = addrClazz.getMethod("setCity", new Class[]{String.class});
+ Method setStreet = addrClazz.getMethod("setStreet", new Class[]{String.class});
+ Method setZip = addrClazz.getMethod("setZip", new Class[]{int.class});
+ Object addr = addrClazz.newInstance();
+ setCity.invoke(addr, new Object[]{"San Jose"});
+ setStreet.invoke(addr, new Object[]{"1007 Home"});
+ setZip.invoke(addr, new Object[]{90210});
+
+ Class benClazz = loader.loadClass(PERSON_CLASSNAME);
+ Method setName = benClazz.getMethod("setName", new Class[]{String.class});
+ Method setAddress = benClazz.getMethod("setAddress", new Class[]{addrClazz});
+ Object ben = benClazz.newInstance();
+ setName.invoke(ben, new Object[]{"Ben"});
+ setAddress.invoke(ben, new Object[]{addr});
+
+ return ben;
+ }
+
private BuddyReplicationConfig getBuddyConfig() throws Exception
{
1.1 date: 2007/07/21 17:00:42; author: vblagojevic; state: Exp;JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
Index: StateTransferConcurrencyTest.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
import org.jboss.cache.Region;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.marshall.InactiveRegionException;
import org.jboss.cache.misc.TestingUtil;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* Abstract superclass of "StateTransferVersion"-specific tests
* of CacheSPI's state transfer capability.
* <p/>
* TODO add tests with classloader regions
*
* @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
* @version $Id: StateTransferConcurrencyTest.java,v 1.1 2007/07/21 17:00:42 vblagojevic Exp $
*/
public class StateTransferConcurrencyTest extends StateTransferTestBase
{
protected String getReplicationVersion()
{
return "2.0.0.GA";
}
/**
* Tests concurrent activation of the same subtree by multiple nodes in a
* REPL_SYNC environment. The idea is to see what would happen with a
* farmed deployment. See <code>concurrentActivationTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentActivationSync() throws Exception
{
concurrentActivationTest(true);
}
/**
* Tests concurrent activation of the same subtree by multiple nodes in a
* REPL_ASYNC environment. The idea is to see what would happen with a
* farmed deployment. See <code>concurrentActivationTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentActivationAsync() throws Exception
{
concurrentActivationTest(false);
}
/**
* Starts 5 caches and then concurrently activates the same region under
* all 5, causing each to attempt a partial state transfer from the others.
* As soon as each cache has activated its region, it does a put to a node
* in the region, thus complicating the lives of the other caches trying
* to get partial state.
* <p/>
* Failure condition is if any node sees an exception or if the final state
* of all caches is not consistent.
*
* @param sync use REPL_SYNC or REPL_ASYNC
* @throws Exception
*/
private void concurrentActivationTest(boolean sync) throws Exception
{
String[] names = {"A", "B", "C", "D", "E"};
int count = names.length;
CacheActivator[] activators = new CacheActivator[count];
try
{
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(count);
for (int i = 0; i < count; i++)
{
semaphore.acquire();
}
// Create activation threads that will block on the semaphore
CacheSPI[] caches = new CacheSPI[count];
for (int i = 0; i < count; i++)
{
activators[i] = new CacheActivator(semaphore, names[i], sync);
caches[i] = activators[i].getCacheSPI();
activators[i].start();
}
// Make sure everyone is in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Release the semaphore to allow the threads to start work
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
TestingUtil.sleepThread((long) 1000);
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
for (int i = 0; i < count; i++)
{
boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
if (!acquired)
{
fail("failed to acquire semaphore " + i);
}
}
// Sleep to allow any async calls to clear
if (!sync)
{
TestingUtil.sleepThread(1000);
}
// Ensure the caches held by the activators see all the values
for (int i = 0; i < count; i++)
{
Exception aException = activators[i].getException();
boolean gotUnexpectedException = aException != null
&& !(aException instanceof InactiveRegionException ||
aException.getCause() instanceof InactiveRegionException);
if (gotUnexpectedException)
{
fail("Activator " + names[i] + " caught an exception " + aException);
}
for (int j = 0; j < count; j++)
{
Fqn fqn = new Fqn(A_B, names[j]);
assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
"VALUE", activators[i].getCacheValue(fqn));
// System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
}
}
}
catch (Exception ex)
{
fail(ex.getLocalizedMessage());
}
finally
{
for (int i = 0; i < count; i++)
{
activators[i].cleanup();
}
}
}
/**
* Starts two caches where each cache has N regions. We put some data in each of the regions.
* We run two threads where each thread creates a cache then goes into a loop where it
* activates the N regions, with a 1 sec pause between activations.
* <p/>
* Threads are started with 10 sec difference.
* <p/>
* This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
* then deploying webapps.
* <p/>
* <p/>
* <p/>
* Failure condition is if any node sees an exception or if the final state
* of all caches is not consistent.
*
* @param sync use REPL_SYNC or REPL_ASYNC
* @throws Exception
*/
private void concurrentActivationTest2(boolean sync) throws Exception
{
String[] names = {"A", "B"};
int count = names.length;
int regionsToActivate = 15;
int sleepTimeBetweenNodeStarts = 10000;
StaggeredWebDeployerActivator[] activators = new StaggeredWebDeployerActivator[count];
try
{
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(count);
for (int i = 0; i < count; i++)
{
semaphore.acquire();
}
// Create activation threads that will block on the semaphore
CacheSPI[] caches = new CacheSPI[count];
for (int i = 0; i < count; i++)
{
activators[i] = new StaggeredWebDeployerActivator(semaphore, names[i], sync, regionsToActivate);
caches[i] = activators[i].getCacheSPI();
// Release the semaphore to allow the thread to start working
semaphore.release(1);
activators[i].start();
TestingUtil.sleepThread(sleepTimeBetweenNodeStarts);
}
// Make sure everyone is in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Sleep to ensure the threads get all the semaphore tickets
TestingUtil.sleepThread(1000);
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
for (int i = 0; i < count; i++)
{
boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
if (!acquired)
{
fail("failed to acquire semaphore " + i);
}
}
// Sleep to allow any async calls to clear
if (!sync)
{
TestingUtil.sleepThread(1000);
}
// Ensure the caches held by the activators see all the values
for (int i = 0; i < count; i++)
{
Exception aException = activators[i].getException();
boolean gotUnexpectedException = aException != null
&& !(aException instanceof InactiveRegionException ||
aException.getCause() instanceof InactiveRegionException);
if (gotUnexpectedException)
{
fail("Activator " + names[i] + " caught an exception " + aException);
}
for (int j = 0; j < regionsToActivate; j++)
{
Fqn fqn = Fqn.fromString("/a/" + i + "/" + names[i]);
assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
"VALUE", activators[i].getCacheValue(fqn));
}
}
}
catch (Exception ex)
{
fail(ex.getLocalizedMessage());
}
finally
{
for (int i = 0; i < count; i++)
{
activators[i].cleanup();
}
}
}
/**
* Starts two caches where each cache has N regions. We put some data in each of the regions.
* We run two threads where each thread creates a cache then goes into a loop where it
* activates the N regions, with a 1 sec pause between activations.
* <p/>
* Threads are started with 10 sec difference.
* <p/>
* This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
* then deploying webapps.
* <p/>
* <p/>
* <p/>
* Failure condition is if any node sees an exception or if the final state
* of all caches is not consistent.
*/
public void testConcurrentStartupActivationAsync() throws Exception
{
concurrentActivationTest2(false);
}
/**
* Starts two caches where each cache has N regions. We put some data in each of the regions.
* We run two threads where each thread creates a cache then goes into a loop where it
* activates the N regions, with a 1 sec pause between activations.
* <p/>
* Threads are started with 10 sec difference.
* <p/>
* This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
* then deploying webapps.
* <p/>
* <p/>
* <p/>
* Failure condition is if any node sees an exception or if the final state
* of all caches is not consistent.
*/
public void testConcurrentStartupActivationSync() throws Exception
{
concurrentActivationTest2(true);
}
/**
* Tests partial state transfer under heavy concurrent load and REPL_SYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseSync() throws Exception
{
concurrentUseTest(true);
}
/**
* Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseAsync() throws Exception
{
concurrentUseTest(false);
}
/**
* Initiates 5 caches, 4 with active trees and one with an inactive tree.
* Each of the active caches begins rapidly generating puts against nodes
* in a subtree for which it is responsible. The 5th cache activates
* each subtree, and at the end confirms no node saw any exceptions and
* that each node has consistent state.
*
* @param sync whether to use REPL_SYNC or REPL_ASYNCE
* @throws Exception
*/
private void concurrentUseTest(boolean sync) throws Exception
{
String[] names = {"B", "C", "D", "E"};
int count = names.length;
CacheStressor[] stressors = new CacheStressor[count];
try
{
// The first cache we create is inactivated.
CacheSPI cacheA = createCache("cacheA", sync, true, false);
CacheSPI[] caches = new CacheSPI[count + 1];
caches[0] = cacheA;
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(count);
for (int i = 0; i < count; i++)
{
semaphore.acquire();
}
// Create stressor threads that will block on the semaphore
for (int i = 0; i < count; i++)
{
stressors[i] = new CacheStressor(semaphore, names[i], sync);
caches[i + 1] = stressors[i].getCacheSPI();
stressors[i].start();
}
// Make sure everyone's views are in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Repeat the basic test four times
//for (int x = 0; x < 4; x++)
for (int x = 0; x < 1; x++)
{
// Reset things by inactivating the region
// and enabling the stressors
for (int i = 0; i < count; i++)
{
Region r = cacheA.getRegion(Fqn.fromString("/" + names[i]), true);
r.registerContextClassLoader(getClass().getClassLoader());
r.deactivate();
System.out.println("Run " + x + "-- /" + names[i] + " deactivated on A");
stressors[i].startPuts();
}
// Release the semaphore to allow the threads to start work
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
// and to ensure puts are actively in progress
TestingUtil.sleepThread((long) 300);
// Activate cacheA
for (int i = 0; i < count; i++)
{
// System.out.println("Activating /" + names[i] + " on A");
cacheA.getRegion(Fqn.fromString("/" + names[i]), true).activate();
// Stop the stressor so we don't pollute cacheA's state
// with too many messages sent after activation -- we want
// to compare transferred state with the sender
stressors[i].stopPuts();
System.out.println("Run " + x + "-- /" + names[i] + " activated on A");
// Reacquire one semaphore ticket
boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
if (!acquired)
{
fail("failed to acquire semaphore " + i);
}
// Pause to allow other work to proceed
TestingUtil.sleepThread(100);
}
// Sleep to allow any in transit msgs to clear
// if (!sync)
TestingUtil.sleepThread(1000);
// Ensure the stressors saw no exceptions
for (int i = 0; i < count; i++)
{
if (stressors[i].getException() != null && !(stressors[i].getException() instanceof InactiveRegionException))
{
fail("Stressor " + names[i] + " caught an exception " + stressors[i].getException());
}
}
TestingUtil.sleepThread(1000);
// Compare cache contents
for (int i = 0; i < count; i++)
{
for (int j = 0; j < SUBTREE_SIZE; j++)
{
Fqn fqn = Fqn.fromString("/" + names[i] + "/" + j);
assertEquals("/A/" + j + " matches " + fqn,
cacheA.get(fqn, "KEY"),
stressors[i].getCacheSPI().get(fqn, "KEY"));
}
}
}
for (int i = 0; i < count; i++)
{
stressors[i].stopThread();
}
}
finally
{
for (int i = 0; i < count; i++)
{
if (stressors[i] != null)
{
stressors[i].cleanup();
}
}
}
}
/**
* Test for JBCACHE-913
*
* @throws Exception
*/
public void testEvictionSeesStateTransfer() throws Exception
{
Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
cache1.start();
caches.put("evict1", cache1);
cache1.put(Fqn.fromString("/a/b/c"), "key", "value");
c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
cache2.start();
caches.put("evict2", cache2);
Region region = cache2.getRegion(Fqn.ROOT, false);
// We expect events for /a, /a/b and /a/b/c
assertEquals("Saw the expected number of node events", 3, region.nodeEventQueueSize());
}
/**
* Further test for JBCACHE-913
*
* @throws Exception
*/
public void testEvictionAfterStateTransfer() throws Exception
{
Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
cache1.start();
caches.put("evict1", cache1);
for (int i = 0; i < 25000; i++)
{
cache1.put(Fqn.fromString("/base/" + i), "key", "base" + i);
if (i < 5)
{
cache1.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i);
}
}
c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
final Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
cache2.start();
caches.put("evict2", cache2);
Node parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
Set children = parent.getChildren();
assertEquals("All data children transferred", 5, children.size());
parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
children = parent.getChildren();
assertTrue("Minimum number of base children transferred", children.size() >= 5000);
// Sleep 2.5 secs so the nodes we are about to create in data won't
// exceed the 4 sec TTL when eviction thread runs
TestingUtil.sleepThread(2500);
class Putter extends Thread
{
Cache cache = null;
boolean stopped = false;
Exception ex = null;
public void run()
{
int i = 25000;
while (!stopped)
{
try
{
cache.put(Fqn.fromString("/base/" + i), "key", "base" + i);
cache.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i);
i++;
}
catch (Exception e)
{
ex = e;
}
}
}
}
Putter p1 = new Putter();
p1.cache = cache1;
p1.start();
Putter p2 = new Putter();
p2.cache = cache2;
p2.start();
Random rnd = new Random();
TestingUtil.sleepThread(rnd.nextInt(200));
int maxCountBase = 0;
int maxCountData = 0;
boolean sawBaseDecrease = false;
boolean sawDataDecrease = false;
long start = System.currentTimeMillis();
while ((System.currentTimeMillis() - start) < 10000)
{
parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
children = parent.getChildren();
if (children != null)
{
int dataCount = children.size();
if (dataCount < maxCountData)
{
System.out.println("data " + dataCount + " < " + maxCountData + " elapsed = " + (System.currentTimeMillis() - start));
sawDataDecrease = true;
}
else
{
maxCountData = dataCount;
}
}
parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
children = parent.getChildren();
if (children != null)
{
int baseCount = children.size();
if (baseCount < maxCountBase)
{
System.out.println("base " + baseCount + " < " + maxCountBase + " elapsed = " + (System.currentTimeMillis() - start));
sawBaseDecrease = true;
}
else
{
maxCountBase = baseCount;
}
}
if (sawDataDecrease && sawBaseDecrease)
{
break;
}
TestingUtil.sleepThread(50);
}
p1.stopped = true;
p2.stopped = true;
p1.join(1000);
p2.join(1000);
assertTrue("Saw data decrease", sawDataDecrease);
assertTrue("Saw base decrease", sawBaseDecrease);
assertNull("No exceptions in p1", p1.ex);
assertNull("No exceptions in p2", p2.ex);
// Sleep 5.1 secs so we are sure the eviction thread ran
TestingUtil.sleepThread(5100);
parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
children = parent.getChildren();
if (children != null)
{
System.out.println(children.size());
assertTrue("Excess children evicted", children.size() <= 5);
}
parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
children = parent.getChildren();
if (children != null)
{
System.out.println(children.size());
assertTrue("Excess children evicted", children.size() <= 25000);
}
// Sleep more to let the eviction thread run again,
// which will evict all data nodes due to their ttl of 4 secs
TestingUtil.sleepThread(8100);
parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
children = parent.getChildren();
if (children != null)
{
assertEquals("All data children evicted", 0, children.size());
}
}
private class CacheActivator extends CacheUser
{
CacheActivator(Semaphore semaphore,
String name,
boolean sync)
throws Exception
{
super(semaphore, name, sync, false);
}
void useCache() throws Exception
{
TestingUtil.sleepRandom(5000);
createAndActivateRegion(cache, A_B);
// System.out.println(name + " activated region" + " " + System.currentTimeMillis());
Fqn childFqn = Fqn.fromString("/a/b/" + name);
cache.put(childFqn, "KEY", "VALUE");
// System.out.println(name + " put fqn " + childFqn + " " + System.currentTimeMillis());
}
public Object getCacheValue(Fqn fqn) throws CacheException
{
return cache.get(fqn, "KEY");
}
}
private class StaggeredWebDeployerActivator extends CacheUser
{
int regionCount = 15;
StaggeredWebDeployerActivator(Semaphore semaphore,
String name,
boolean sync,
int regionCount)
throws Exception
{
super(semaphore, name, sync, false);
this.regionCount = regionCount;
}
void useCache() throws Exception
{
for (int i = 0; i < regionCount; i++)
{
createAndActivateRegion(cache, Fqn.fromString("/a/" + i));
Fqn childFqn = Fqn.fromString("/a/" + i + "/" + name);
cache.put(childFqn, "KEY", "VALUE");
TestingUtil.sleepThread(1000);
}
}
public Object getCacheValue(Fqn fqn) throws CacheException
{
return cache.get(fqn, "KEY");
}
}
private class CacheStressor extends CacheUser
{
private Random random = new Random(System.currentTimeMillis());
private boolean putsStopped = false;
private boolean stopped = false;
CacheStressor(Semaphore semaphore,
String name,
boolean sync)
throws Exception
{
super(semaphore, name, sync, true);
}
void useCache() throws Exception
{
// Do continuous puts into the cache. Use our own nodes,
// as we're not testing conflicts between writer nodes,
// just whether activation causes problems
int factor = 0;
int i = 0;
Fqn fqn = null;
boolean acquired = false;
while (!stopped)
{
if (i > 0)
{
acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
if (!acquired)
{
throw new Exception(name + " cannot acquire semaphore");
}
}
while (!putsStopped)
{
factor = random.nextInt(50);
fqn = Fqn.fromString("/" + name + "/" + String.valueOf(factor % SUBTREE_SIZE));
Integer value = factor / SUBTREE_SIZE;
cache.put(fqn, "KEY", value);
TestingUtil.sleepThread((long) factor);
i++;
}
System.out.println(name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
semaphore.release();
// Go to sleep until directed otherwise
while (!stopped && putsStopped)
{
TestingUtil.sleepThread((long) 100);
}
}
}
public void stopPuts()
{
putsStopped = true;
}
public void startPuts()
{
putsStopped = false;
}
public void stopThread()
{
stopped = true;
if (thread.isAlive())
{
thread.interrupt();
}
}
}
}
More information about the jboss-cvs-commits
mailing list