[jboss-cvs] JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer ...
Ben Wang
bwang at jboss.com
Tue Oct 10 09:35:47 EDT 2006
User: bwang
Date: 06/10/10 09:35:47
Added: tests-50/functional/org/jboss/cache/pojo/statetransfer
StateTransferAopTestBase.java
StateTransfer200AopTest.java ReplicatedTest.java
Log:
State transfer test case.
Revision Changes Path
1.1 date: 2006/10/10 13:35:47; author: bwang; state: Exp;JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer/StateTransferAopTestBase.java
Index: StateTransferAopTestBase.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.pojo.statetransfer;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.Cache;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.pojo.PojoCache;
import org.jboss.cache.pojo.PojoCacheFactory;
import org.jboss.cache.pojo.test.Address;
import org.jboss.cache.pojo.test.Person;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.XmlConfigurationParser;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.misc.TestingUtil;
import org.jboss.cache.xml.XmlHelper;
import org.w3c.dom.Element;
import javax.transaction.TransactionManager;
import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
/**
* Tests state transfer in PojoCache.
*
* @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
* @version $Revision: 1.1 $
*/
public abstract class StateTransferAopTestBase extends TestCase
{
private Map caches;
public static final String A_B_1 = "/a/b/1";
public static final String A_B_2 = "/a/b/2";
public static final String A_C_1 = "/a/c/1";
public static final String A_C_2 = "/a/c/2";
public static final Fqn A_B_1_f = Fqn.fromString("/a/b/1");
public static final Fqn A_B_2_f = Fqn.fromString("/a/b/2");
public static final Fqn A_C_1_f = Fqn.fromString("/a/c/1");
public static final Fqn A_C_2_f = Fqn.fromString("/a/c/2");
private static final int SUBTREE_SIZE = 10;
private Person joe;
private Person bob;
private Person jane;
private Person jill;
private Address addr1;
private Address addr2;
public static final Integer TWENTY = new Integer(20);
public static final Integer TWENTYFIVE = new Integer(25);
public static final Integer FORTY = new Integer(40);
private Log log = LogFactory.getLog(StateTransferAopTestBase.class);
public void testInitialStateTransfer() throws Exception
{
log.info("Enter testInitialStateTransfer");
PojoCache cache1 = createCache("cache1", true, false, false);
cache1.attach(A_B_1, joe);
cache1.attach(A_B_2, jane);
cache1.attach(A_C_1, bob);
cache1.attach(A_C_2, jill);
PojoCache cache2 = createCache("cache2", true, false, false);
// Pause to give caches time to see each other
// TestingUtil.blockUntilViewsReceived(new Cache[]
// {cache1.getCache(), cache2.getCache()}, 60000);
Person ab1 = (Person) cache2.find(A_B_1);
Person ab2 = (Person) cache2.find(A_B_2);
Person ac1 = (Person) cache2.find(A_C_1);
Person ac2 = (Person) cache2.find(A_C_2);
assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
}
public void testInitialStateTferWithLoader() throws Exception
{
log.info("Enter testInitialStateTferWithLoader");
PojoCache cache1 = createCache("cache1", false, false, true);
cache1.attach(A_B_1, joe);
cache1.attach(A_B_2, jane);
cache1.attach(A_C_1, bob);
cache1.attach(A_C_2, jill);
PojoCache cache2 = createCache("cache2", false, false, true);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new Cache[]
{cache1.getCache(), cache2.getCache()}, 60000);
CacheLoader loader = ((CacheSPI)cache2.getCache()).getCacheLoader();
assertEquals("Loader name for /a/b/1 is Joe", joe.getName(), loader.get(A_B_1_f).get("name"));
assertEquals("Loader name for /a/b/2 is Jane", jane.getName(), loader.get(A_B_2_f).get("name"));
assertEquals("Loader name for /a/c/1 is Joe", bob.getName(), loader.get(A_C_1_f).get("name"));
assertEquals("Loader name for /a/c/2 is Jill", jill.getName(), loader.get(A_C_2_f).get("name"));
Person ab1 = (Person) cache2.find(A_B_1);
Person ab2 = (Person) cache2.find(A_B_2);
Person ac1 = (Person) cache2.find(A_C_1);
Person ac2 = (Person) cache2.find(A_C_2);
assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
}
public void testPartialStateTransfer() throws Exception
{
log.info("Enter testPartialStateTransfer");
PojoCache cache1 = createCache("cache1", false, true, false);
cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
cache1.attach(A_B_1, joe);
cache1.attach(A_B_2, jane);
PojoCache cache2 = createCache("cache2", false, true, false);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new Cache[]
{cache1.getCache(), cache2.getCache()}, 60000);
assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
cache2.getCache().getRegion(Fqn.fromString("/a/b"), true).activate();
Person ab1 = (Person) cache2.find(A_B_1);
Person ab2 = (Person) cache2.find(A_B_2);
assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
assertEquals("City for /a/b/1 is Anytown", joe.getAddress().getCity(), ab1.getAddress().getCity());
assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
assertEquals("City for /a/b/2 is Anytown", jane.getAddress().getCity(), ab2.getAddress().getCity());
assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
cache1.attach(A_C_1, bob);
cache1.attach(A_C_2, jill);
assertNull("/a/c/1 not transferred per policy", cache2.find(A_C_1));
cache2.getCache().getRegion(Fqn.fromString("/a/c"), true).activate();
Person ac1 = (Person) cache2.find(A_C_1);
Person ac2 = (Person) cache2.find(A_C_2);
assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
cache1.getCache().getRegion(Fqn.fromString("/a/b/1"), true).activate();
cache1.getCache().getRegion(Fqn.fromString("/a/b/2"), true).activate();
cache1.getCache().getRegion(Fqn.fromString("/a/c/1"), true).activate();
cache1.getCache().getRegion(Fqn.fromString("/a/c/2"), true).activate();
ab1 = (Person) cache1.find(A_B_1);
assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
ab2 = (Person) cache1.find(A_B_2);
;
assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
ac1 = (Person) cache1.find(A_C_1);
assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
ac2 = (Person) cache1.find(A_C_2);
assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
assertTrue("Address for Bob and Jill is the same object", ac1.getAddress() == ac2.getAddress());
}
public void testPartialStateTransferWithLoader() throws Exception
{
log.info("Enter testPartialStateTransferWithLoader");
PojoCache cache1 = createCache("cache1", false, true, true);
cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
cache1.attach(A_B_1, joe);
cache1.attach(A_B_2, jane);
PojoCache cache2 = createCache("cache2", false, true, true);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new Cache[]
{cache1.getCache(), cache2.getCache()}, 60000);
CacheLoader loader = ((CacheSPI)cache2.getCache()).getCacheLoader();
Map map = loader.get(A_B_1_f);
if (map != null)
{
assertNull("/a/b/1 name not transferred per policy", map.get("name"));
assertNull("/a/b/1 age not transferred per policy", map.get("age"));
}
map = loader.get(A_B_2_f);
if (map != null)
{
assertNull("/a/b/1 name not transferred per policy", map.get("name"));
assertNull("/a/b/1 age not transferred per policy", map.get("age"));
}
assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
cache1.getCache().getRegion(Fqn.fromString("/a/b"), true).activate();
assertEquals("Correct name from loader for /a/b/1", joe.getName(), loader.get(A_B_1_f).get("name"));
assertEquals("Correct age from loader for /a/b/1", TWENTY, loader.get(A_B_1_f).get("age"));
assertEquals("Correct name from loader for /a/b/2", jane.getName(), loader.get(A_B_2_f).get("name"));
assertEquals("Correct age from loader for /a/b/2", TWENTYFIVE, loader.get(A_B_2_f).get("age"));
Person ab1 = (Person) cache2.find(A_B_1);
Person ab2 = (Person) cache2.find(A_B_2);
assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
cache1.attach(A_C_1, bob);
cache1.attach(A_C_2, jill);
assertNull("/a/c/1 not transferred per policy", cache2.find(A_C_1));
cache1.getCache().getRegion(Fqn.fromString("/a/c"), true).activate();
assertEquals("Correct name from loader for /a/b/1", joe.getName(), loader.get(A_B_1_f).get("name"));
assertEquals("Correct age from loader for /a/b/1", TWENTY, loader.get(A_B_1_f).get("age"));
assertEquals("Correct name from loader for /a/b/2", jane.getName(), loader.get(A_B_2_f).get("name"));
assertEquals("Correct age from loader for /a/b/2", TWENTYFIVE, loader.get(A_B_2_f).get("age"));
assertEquals("Correct name from loader for /a/c/1", bob.getName(), loader.get(A_C_1_f).get("name"));
assertEquals("Correct age from loader for /a/c/1", FORTY, loader.get(A_C_1_f).get("age"));
assertEquals("Correct name from loader for /a/c/2", jill.getName(), loader.get(A_C_2_f).get("name"));
assertEquals("Correct age from loader for /a/c/2", TWENTYFIVE, loader.get(A_C_2_f).get("age"));
Person ac1 = (Person) cache2.find(A_C_1);
Person ac2 = (Person) cache2.find(A_C_2);
assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
cache1.getCache().getRegion(Fqn.fromString("/a/b/1"), true).activate();
cache1.getCache().getRegion(Fqn.fromString("/a/b/2"), true).activate();
cache1.getCache().getRegion(Fqn.fromString("/a/c/1"), true).activate();
cache1.getCache().getRegion(Fqn.fromString("/a/c/2"), true).activate();
ab1 = (Person) cache1.find(A_B_1);
assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
ab2 = (Person) cache1.find(A_B_2);
;
assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
ac1 = (Person) cache1.find(A_C_1);
assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
ac2 = (Person) cache1.find(A_C_2);
assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
assertTrue("Address for Bob and Jill is the same object", ac1.getAddress() == ac2.getAddress());
}
/**
* Tests concurrent activation of the same subtree by multiple nodes in a
* REPL_SYNC environment. The idea is to see what would happen with a
* farmed deployment. See <code>concurrentActivationTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentActivationSync() throws Exception
{
log.info("Enter testConcurrentActivationSync");
concurrentActivationTest(true);
}
/**
* Tests concurrent activation of the same subtree by multiple nodes in a
* REPL_ASYNC environment. The idea is to see what would happen with a
* farmed deployment. See <code>concurrentActivationTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentActivationAsync() throws Exception
{
log.info("Enter testConcurrentActivationAsync");
concurrentActivationTest(false);
}
/**
* Starts 5 caches and then concurrently activates the same region under
* all 5, causing each to attempt a partial state transfer from the others.
* As soon as each cache has activated its region, it does a put to a node
* in the region, thus complicating the lives of the other caches trying
* to get partial state.
* <p/>
* Failure condition is if any node sees an exception or if the final state
* of all caches is not consistent.
*
* @param sync use REPL_SYNC or REPL_ASYNC
* @throws Exception
*/
private void concurrentActivationTest(boolean sync) throws Exception
{
String[] names = {"A", "B", "C", "D", "E"};
int count = names.length;
CacheActivator[] activators = new CacheActivator[count];
try
{
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(count);
for (int i = 0; i < count; i++)
{
semaphore.acquire();
}
// Create activation threads that will block on the semaphore
Cache[] caches = new Cache[count];
for (int i = 0; i < count; i++)
{
activators[i] = new CacheActivator(semaphore, names[i], sync);
caches[i] = activators[i].getCache();
activators[i].start();
}
// Make sure everyone is in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Release the semaphore to allow the threads to start work
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
TestingUtil.sleepThread(1000);
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
for (int i = 0; i < count; i++)
{
boolean acquired = semaphore.attempt(60000);
if (!acquired)
fail("failed to acquire semaphore " + i);
}
// Sleep to allow any async calls to clear
if (!sync)
TestingUtil.sleepThread(500);
// Ensure the caches held by the activators see all the values
for (int i = 0; i < count; i++)
{
assertNull("Activator " + names[i] + " caught an exception",
activators[i].getException());
for (int j = 0; j < count; j++)
{
String fqn = "/a/b/" + names[j];
Person p = (Person) activators[i].getCacheValue(fqn);
assertNotNull(names[i] + ":" + fqn + " is not null", p);
assertEquals("Correct name for " + names[i] + ":" + fqn,
"Person " + names[j], p.getName());
assertEquals("Correct street for " + names[i] + ":" + fqn,
names[j] + " Test Street", p.getAddress().getStreet());
// System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
}
}
}
catch (Exception ex)
{
fail(ex.getLocalizedMessage());
}
finally
{
for (int i = 0; i < count; i++)
activators[i].cleanup();
}
}
/**
* Tests partial state transfer under heavy concurrent load and REPL_SYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseSync() throws Exception
{
log.info("Enter testConcurrentUseSync");
// concurrentUseTest(true);
}
/**
* Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseAsync() throws Exception
{
log.info("Enter testConcurrentUseAsync");
// concurrentUseTest(false);
}
/**
* Initiates 5 caches, 4 with active trees and one with an inactive tree.
* Each of the active caches begins rapidly generating puts against nodes
* in a subtree for which it is responsible. The 5th cache activates
* each subtree, and at the end confirms no node saw any exceptions and
* that each node has consistent state.
*
* @param sync whether to use REPL_SYNC or REPL_ASYNCE
* @throws Exception
*/
private void XconcurrentUseTest(boolean sync) throws Exception
{
String[] names = {"B", "C", "D", "E"};
int count = names.length;
CacheStressor[] stressors = new CacheStressor[count];
try
{
PojoCache cacheA = createCache("cacheA", sync, true, false, false);
Cache[] caches = new Cache[count + 1];
caches[0] = cacheA.getCache();
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(count);
for (int i = 0; i < count; i++)
{
semaphore.acquire();
}
// Create stressor threads that will block on the semaphore
for (int i = 0; i < count; i++)
{
stressors[i] = new CacheStressor(semaphore, names[i], sync);
caches[i + 1] = stressors[i].getCache();
stressors[i].start();
// Give each one a chance to stabilize
TestingUtil.sleepThread(100);
}
// Make sure everyone's views are in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Repeat the basic test two times in order to involve inactivation
for (int x = 0; x < 2; x++)
{
// if (x > 0)
// {
// Reset things by inactivating the region
// and enabling the stressors
for (int i = 0; i < count; i++)
{
cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).deactivate();
log.info("TEST: Run " + x + "-- /" + names[i] + " inactivated on A");
stressors[i].startPuts();
}
// }
// Release the semaphore to allow the threads to start work
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
// and to ensure puts are actively in progress
TestingUtil.sleepThread(300);
// Activate cacheA
for (int i = 0; i < count; i++)
{
log.info("TEST: Activating /" + names[i] + " on A");
cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).activate();
// Stop the stressor so we don't pollute cacheA's state
// with too many messages sent after activation -- we want
// to compare transferred state with the sender
stressors[i].stopPuts();
log.info("TEST: Run " + x + "-- /" + names[i] + " activated on A");
// Reacquire one semaphore ticket
boolean acquired = semaphore.attempt(60000);
if (!acquired)
fail("failed to acquire semaphore " + names[i]);
log.info("TEST: Run " + x + "-- acquired semaphore from " + names[i]);
// Pause to allow other work to proceed
TestingUtil.sleepThread(100);
}
// Sleep to allow any in transit msgs to clear
if (!sync)
TestingUtil.sleepThread(2000);
// Ensure the stressors saw no exceptions
for (int i = 0; i < count; i++)
{
Exception e = stressors[i].getException();
if (e != null)
{
log.error("Stressor " + names[i] + " caught an exception",
e);
throw e;
}
}
// log.info("Cache A details:\n" + cacheA.printDetails());
// Compare cache contents
Person p1 = null;
Person p2 = null;
for (int i = 0; i < count; i++)
{
// log.info("Cache " + names[i] + " details:\n" +
// stressors[i].getTreeCache().printDetails());
for (int j = 0; j < SUBTREE_SIZE; j++)
{
String fqn = "/" + names[i] + "/" + j;
log.info("TEST: Getting A:" + fqn);
p1 = (Person) cacheA.find(fqn);
boolean p1Null = p1 == null;
log.info("TEST: Getting " + names[i] + ":" + fqn);
// p2 = (Person) stressors[i].getCache().find(fqn);
boolean p2Null = p2 == null;
assertEquals("Run " + x + ": " + fqn +
" null status matches", p1Null, p2Null);
if (!p1Null)
{
assertEquals("Run " + x + ": A:" + fqn + " age matches " + names[i] + ":" + fqn,
p1.getAge(), p2.getAge());
assertEquals("Run " + x + ": A:" + fqn + " name matches " + names[i] + ":" + fqn,
p1.getName(), p2.getName());
assertEquals("Run " + x + ": A:" + fqn + " address matches " + names[i] + ":" + fqn,
p1.getAddress().getStreet(),
p2.getAddress().getStreet());
}
}
}
}
for (int i = 0; i < count; i++)
stressors[i].stopThread();
}
finally
{
for (int i = 0; i < count; i++)
{
if (stressors[i] != null)
stressors[i].cleanup();
}
}
}
protected PojoCache createCache(String cacheID, boolean sync, boolean useMarshalling, boolean useCacheLoader)
throws Exception
{
return createCache(cacheID, sync, useMarshalling, useCacheLoader, true);
}
protected PojoCache createCache(String cacheID, boolean sync,
boolean useMarshalling,
boolean useCacheLoader,
boolean inactiveOnStartup)
throws Exception
{
if (caches.get(cacheID) != null)
throw new IllegalStateException(cacheID + " already created");
XmlConfigurationParser parser = new XmlConfigurationParser();
Configuration c = parser.parseFile(sync ? "META-INF/replSync-service.xml" : "META-INF/replAsync-service.xml");
c.setClusterName("StateTransferTestBase");
c.setReplVersionString(getReplicationVersion());
// Use a long timeout to facilitate setting debugger breakpoints
c.setInitialStateRetrievalTimeout(60000);
if (useMarshalling)
{
c.setUseRegionBasedMarshalling(true);
c.setInactiveOnStartup(inactiveOnStartup);
}
if (useCacheLoader)
{
configureCacheLoader(c, cacheID, useMarshalling);
}
PojoCache cache = PojoCacheFactory.createInstance(c, true);
// Put the cache in the map before starting, so if it fails in
// start it can still be destroyed later
caches.put(cacheID, cache);
return cache;
}
protected void configureCacheLoader(Configuration c, String cacheID, boolean useExtended) throws Exception
{
String tmp_location = getTempLocation(cacheID);
// Do cleanup in case it failed before
File file = new File(tmp_location);
cleanFile(file);
file.mkdir();
tmp_location = escapeWindowsPath(tmp_location);
if (useExtended)
c.setCacheLoaderConfiguration(getCacheLoaderConfig("org.jboss.cache.loader.FileExtendedCacheLoader", tmp_location));
else
c.setCacheLoaderConfiguration(getCacheLoaderConfig("org.jboss.cache.loader.FileCacheLoader", tmp_location));
}
protected Element getCacheLoaderConfig(String cl, String loc) throws Exception
{
String xml = " <config>\n" +
" \n" +
" <passivation>false</passivation>\n" +
" <preload></preload>\n" +
"\n" +
" <cacheloader>\n" +
" <class>" + cl + "</class>\n" +
" <properties>\n" +
" location=" + loc + "\n" +
" </properties>\n" +
" <async>false</async>\n" +
" <fetchPersistentState>true</fetchPersistentState>\n" +
" <ignoreModifications>false</ignoreModifications>\n" +
" </cacheloader>\n" +
" \n" +
" </config>";
return XmlHelper.stringToElement(xml);
}
protected String getTempLocation(String cacheID)
{
String tmp_location = System.getProperty("java.io.tmpdir", "c:\\tmp");
File file = new File(tmp_location);
file = new File(file, cacheID);
return file.getAbsolutePath();
}
protected String escapeWindowsPath(String path)
{
if ('/' == File.separatorChar)
return path;
char[] chars = path.toCharArray();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < chars.length; i++)
{
if (chars[i] == '\\')
sb.append('\\');
sb.append(chars[i]);
}
return sb.toString();
}
protected abstract String getReplicationVersion();
protected void setUp() throws Exception
{
super.setUp();
caches = new HashMap();
addr1 = new Address();
addr1.setStreet("101 Oakview Dr");
addr1.setCity("Anytown");
addr1.setZip(11111);
addr2 = new Address();
addr2.setStreet("222 Happy Dr");
addr2.setCity("Fremont");
addr2.setZip(22222);
joe = new Person();
joe.setName("Joe");
joe.setAge(TWENTY.intValue());
joe.setAddress(addr1);
Set skills = new HashSet();
skills.add("TENNIS");
skills.add("CARPENTRY");
joe.setSkills(skills);
jane = new Person();
jane.setName("Jane");
jane.setAge(TWENTYFIVE.intValue());
jane.setAddress(addr1);
skills = new HashSet();
skills.add("JUJITSU");
skills.add("MACRAME");
jane.setSkills(skills);
bob = new Person();
bob.setName("Bob");
bob.setAge(FORTY.intValue());
bob.setAddress(addr2);
skills = new HashSet();
skills.add("LANGUAGES");
skills.add("LAWN BOWLING");
bob.setSkills(skills);
jill = new Person();
jill.setName("Jill");
jill.setAge(TWENTYFIVE.intValue());
jill.setAddress(addr2);
skills = new HashSet();
skills.add("FORTRAN");
skills.add("COBOL");
jane.setSkills(skills);
}
protected void tearDown() throws Exception
{
super.tearDown();
Set keys = caches.keySet();
String[] cacheIDs = new String[keys.size()];
cacheIDs = (String[]) keys.toArray(cacheIDs);
for (int i = 0; i < cacheIDs.length; i++)
{
stopCache((PojoCache) caches.get(cacheIDs[i]));
File file = new File(getTempLocation(cacheIDs[i]));
cleanFile(file);
}
}
protected void stopCache(PojoCache cache)
{
if (cache != null)
{
try
{
cache.stop();
cache.destroy();
}
catch (Exception e)
{
log.error("Exception stopping cache " + e.getMessage(), e);
}
}
}
protected void cleanFile(File file)
{
File[] children = file.listFiles();
if (children != null)
{
for (int i = 0; i < children.length; i++)
{
cleanFile(children[i]);
}
}
if (file.exists())
file.delete();
if (file.exists())
file.deleteOnExit();
}
private class CacheActivator extends CacheUser
{
CacheActivator(Semaphore semaphore,
String name,
boolean sync)
throws Exception
{
super(semaphore, name, sync, false);
}
void useCache() throws Exception
{
cache.getCache().getRegion(Fqn.fromString("/a/b"), true).activate();
log.info("TEST: " + name + " activated region" + " " + System.currentTimeMillis());
String childFqn = "/a/b/" + name;
Person p = new Person();
p.setName("Person " + name);
Address addr = new Address();
addr.setStreet(name + " Test Street");
addr.setCity(name + ", CA");
p.setAddress(addr);
TestingUtil.sleepThread(1);
// tm.begin();
// try
// {
cache.attach(childFqn, p);
log.info("TEST: " + name + " put fqn " + childFqn + " " + System.currentTimeMillis());
// }
// catch (Exception e)
// {
// tm.setRollbackOnly();
// throw e;
// }
// finally
// {
// tm.commit();
// }
}
public Object getCacheValue(String fqn) throws CacheException
{
return cache.find(fqn);
}
}
private class CacheStressor extends CacheUser
{
private Random random;
private boolean putsStopped = false;
private boolean stopped = false;
CacheStressor(Semaphore semaphore,
String name,
boolean sync)
throws Exception
{
super(semaphore, name, sync, true);
random = new Random(System.currentTimeMillis() + name.hashCode());
}
void useCache() throws Exception
{
// Do lots of puts into the cache. Use our own nodes,
// as we're not testing conflicts between writer nodes,
// just whether activation causes problems
int factor = 0;
int i = 0;
String fqn = null;
Address addr1 = new Address();
addr1.setStreet("1 Test Street");
addr1.setCity("TestOne, CA");
Address addr2 = new Address();
addr2.setStreet("2 Test Street");
addr2.setCity("TestTwo, CA");
Person[] people = new Person[SUBTREE_SIZE];
boolean[] loaded = new boolean[SUBTREE_SIZE];
for (int j = 0; j < SUBTREE_SIZE; j++)
{
Person p = new Person();
p.setName("Person " + j);
p.setAge(j);
p.setAddress((j % 2 == 0) ? addr1 : addr2);
people[j] = p;
}
boolean acquired = true;
try
{
while (!stopped)
{
if (i > 0)
{
acquired = semaphore.attempt(60000);
if (!acquired)
throw new Exception(name + " cannot acquire semaphore");
log.info("TEST: " + name + " reacquired semaphore");
System.out.println("TEST: " + name + " reacquired semaphore");
}
int lastIndex = -1;
int index = -1;
while (!putsStopped)
{
// Ensure we don't operate on the same address twice in a row
// otherwise deadlock detection sometimes causes
// the _put for the second call to precede the commit
// for the first, leading to deadlock. This seems like a
// JGroups bug, but the purpose of this test isn't to expose it
while (index % 2 == lastIndex % 2)
{
factor = random.nextInt(50);
index = factor % SUBTREE_SIZE;
}
lastIndex = index;
TestingUtil.sleepThread(factor);
fqn = "/" + name + "/" + String.valueOf(index);
// tm.begin();
// try
// {
if (loaded[index] == false)
{
cache.attach(fqn, people[index]);
loaded[index] = true;
log.info("TEST: " + name + " put Person at " + fqn);
}
else if (i % 2 == 0)
{
int newAge = factor / SUBTREE_SIZE;
people[index].setAge(newAge);
}
else
{
people[index].getAddress().setStreet(factor + " Test Street");
}
// }
// catch (Exception e)
// {
// tm.setRollbackOnly();
// throw e;
// }
// finally
// {
// tm.commit();
// }
i++;
}
log.info("TEST: " + name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
semaphore.release();
acquired = false;
// Go to sleep until directed otherwise
while (!stopped && putsStopped)
TestingUtil.sleepThread(100);
}
}
finally
{
if (acquired)
semaphore.release();
}
}
// public void start() throws Exception
// {
// super.start();
// cache.activateRegion("/" + name);
// }
public void stopPuts()
{
putsStopped = true;
log.info("TEST: " + name + " putsStopped");
}
public void startPuts()
{
putsStopped = false;
}
public void stopThread()
{
stopped = true;
if (thread.isAlive())
thread.interrupt();
}
}
private abstract class CacheUser implements Runnable
{
protected Semaphore semaphore;
protected PojoCache cache;
protected TransactionManager tm;
protected String name;
protected Exception exception;
protected Thread thread;
CacheUser(Semaphore semaphore,
String name,
boolean sync,
boolean activateRoot)
throws Exception
{
this.cache = createCache(name, sync, true, false, !activateRoot);
tm = ((CacheSPI)cache.getCache()).getTransactionManager();
if (tm == null)
throw new IllegalStateException("TransactionManager required");
this.semaphore = semaphore;
this.name = name;
log.info("TEST: Cache " + name + " started");
System.out.println("TEST: Cache " + name + " started");
}
public void run()
{
log.info("TEST: " + name + " started");
System.out.println("TEST: " + name + " started");
boolean acquired = false;
try
{
acquired = semaphore.attempt(60000);
if (!acquired)
throw new Exception(name + " cannot acquire semaphore");
log.info("TEST: " + name + " acquired semaphore");
System.out.println("TEST: " + name + " acquired semaphore");
useCache();
}
catch (Exception e)
{
log.error("TEST: " + name + ": " + e.getLocalizedMessage(), e);
// Save it for the test to check
exception = e;
}
finally
{
if (acquired)
semaphore.release();
}
}
abstract void useCache() throws Exception;
public Exception getException()
{
return exception;
}
public Cache getCache()
{
return cache.getCache();
}
public void start() throws Exception
{
thread = new Thread(this);
thread.start();
}
public void cleanup()
{
if (thread != null && thread.isAlive())
thread.interrupt();
}
}
}
1.1 date: 2006/10/10 13:35:47; author: bwang; state: Exp;JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer/StateTransfer200AopTest.java
Index: StateTransfer200AopTest.java
===================================================================
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.pojo.statetransfer;
/**
* Tests that PojoCache state transfer works properly if the version is 2.0.0.
*
* @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
* @version $Revision: 1.1 $
*/
public class StateTransfer200AopTest extends StateTransferAopTestBase
{
protected String getReplicationVersion()
{
return "2.0.0.GA";
}
}
1.1 date: 2006/10/10 13:35:47; author: bwang; state: Exp;JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer/ReplicatedTest.java
Index: ReplicatedTest.java
===================================================================
/*
* JBoss, Home of Professional Open Source.
* Copyright 2006, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.cache.pojo.statetransfer;
import junit.framework.TestCase;
import junit.framework.Test;
import junit.framework.TestSuite;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.pojo.PojoCache;
import org.jboss.cache.pojo.PojoCacheFactory;
import org.jboss.cache.pojo.test.Person;
import org.jboss.cache.pojo.test.Student;
import java.util.List;
/**
* Simple replicated test for state transfer
* @author Ben Wang
*/
public class ReplicatedTest extends TestCase
{
Log log = LogFactory.getLog(ReplicatedTest.class);
PojoCache cache, cache1;
public ReplicatedTest(String name)
{
super(name);
}
protected void setUp() throws Exception
{
super.setUp();
log.info("setUp() ....");
}
protected void tearDown() throws Exception
{
super.tearDown();
cache.stop();
cache1.stop();
}
// public void testDummy() {}
private Person createPerson(String id, String name, int age)
{
Person p = new Person();
p.setName(name);
p.setAge(age);
cache.attach(id, p);
return p;
}
private Student createStudent(String id, String name, int age, String grade)
{
Student p = new Student();
p.setName(name);
p.setAge(age);
p.setYear(grade);
cache.attach(id, p);
return p;
}
public void testSimple() throws Exception
{
String configFile = "META-INF/replSync-service.xml";
boolean toStart = true;
cache = PojoCacheFactory.createInstance(configFile, toStart);
Person ben = createPerson("/person/test1", "Ben Wang", 40);
System.out.println("\n*** I ***");
System.out.println(((org.jboss.cache.TreeCacheProxyImpl)cache.getCache()).printDetails());
cache1 = PojoCacheFactory.createInstance(configFile, toStart);
cache1.start();
System.out.println("\n*** II ***");
System.out.println(((org.jboss.cache.TreeCacheProxyImpl)cache1.getCache()).printDetails());
Person p = (Person)cache1.find("/person/test1");
log.info("testSimple() ....");
assertEquals("Ben Wang", ben.getName());
assertEquals("Ben Wang", ((Person) cache1.find("/person/test1")).getName());
cache.detach("/person/test1");
}
public static Test suite() throws Exception
{
return new TestSuite(ReplicatedTest.class);
}
public static void main(String[] args) throws Exception
{
junit.textui.TestRunner.run(suite());
}
}
More information about the jboss-cvs-commits
mailing list