[jboss-cvs] JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer ...

Ben Wang bwang at jboss.com
Tue Oct 10 09:35:47 EDT 2006


  User: bwang   
  Date: 06/10/10 09:35:47

  Added:       tests-50/functional/org/jboss/cache/pojo/statetransfer   
                        StateTransferAopTestBase.java
                        StateTransfer200AopTest.java ReplicatedTest.java
  Log:
  State transfer test case.
  
  Revision  Changes    Path
  1.1      date: 2006/10/10 13:35:47;  author: bwang;  state: Exp;JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer/StateTransferAopTestBase.java
  
  Index: StateTransferAopTestBase.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.cache.pojo.statetransfer;
  
  import EDU.oswego.cs.dl.util.concurrent.Semaphore;
  import junit.framework.TestCase;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.CacheException;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.TreeCache;
  import org.jboss.cache.Cache;
  import org.jboss.cache.CacheSPI;
  import org.jboss.cache.pojo.PojoCache;
  import org.jboss.cache.pojo.PojoCacheFactory;
  import org.jboss.cache.pojo.test.Address;
  import org.jboss.cache.pojo.test.Person;
  import org.jboss.cache.config.Configuration;
  import org.jboss.cache.factories.XmlConfigurationParser;
  import org.jboss.cache.loader.CacheLoader;
  import org.jboss.cache.misc.TestingUtil;
  import org.jboss.cache.xml.XmlHelper;
  import org.w3c.dom.Element;
  
  import javax.transaction.TransactionManager;
  import java.io.File;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.Map;
  import java.util.Random;
  import java.util.Set;
  
  /**
   * Tests state transfer in PojoCache.
   *
   * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
   * @version $Revision: 1.1 $
   */
  public abstract class StateTransferAopTestBase extends TestCase
  {
     private Map caches;
  
     public static final String A_B_1 = "/a/b/1";
     public static final String A_B_2 = "/a/b/2";
     public static final String A_C_1 = "/a/c/1";
     public static final String A_C_2 = "/a/c/2";
  
     public static final Fqn A_B_1_f = Fqn.fromString("/a/b/1");
     public static final Fqn A_B_2_f = Fqn.fromString("/a/b/2");
     public static final Fqn A_C_1_f = Fqn.fromString("/a/c/1");
     public static final Fqn A_C_2_f = Fqn.fromString("/a/c/2");
  
     private static final int SUBTREE_SIZE = 10;
  
     private Person joe;
     private Person bob;
     private Person jane;
     private Person jill;
     private Address addr1;
     private Address addr2;
  
     public static final Integer TWENTY = new Integer(20);
     public static final Integer TWENTYFIVE = new Integer(25);
     public static final Integer FORTY = new Integer(40);
  
     private Log log = LogFactory.getLog(StateTransferAopTestBase.class);
  
     public void testInitialStateTransfer() throws Exception
     {
        log.info("Enter testInitialStateTransfer");
  
        PojoCache cache1 = createCache("cache1", true, false, false);
  
        cache1.attach(A_B_1, joe);
        cache1.attach(A_B_2, jane);
        cache1.attach(A_C_1, bob);
        cache1.attach(A_C_2, jill);
  
        PojoCache cache2 = createCache("cache2", true, false, false);
  
        // Pause to give caches time to see each other
  //      TestingUtil.blockUntilViewsReceived(new Cache[]
  //              {cache1.getCache(), cache2.getCache()}, 60000);
  
        Person ab1 = (Person) cache2.find(A_B_1);
        Person ab2 = (Person) cache2.find(A_B_2);
        Person ac1 = (Person) cache2.find(A_C_1);
        Person ac2 = (Person) cache2.find(A_C_2);
        assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
        assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
        assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
        assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
        assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
        assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
        assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
        assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
        assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
     }
  
     public void testInitialStateTferWithLoader() throws Exception
     {
        log.info("Enter testInitialStateTferWithLoader");
  
        PojoCache cache1 = createCache("cache1", false, false, true);
  
        cache1.attach(A_B_1, joe);
        cache1.attach(A_B_2, jane);
        cache1.attach(A_C_1, bob);
        cache1.attach(A_C_2, jill);
  
        PojoCache cache2 = createCache("cache2", false, false, true);
  
        // Pause to give caches time to see each other
        TestingUtil.blockUntilViewsReceived(new Cache[]
                {cache1.getCache(), cache2.getCache()}, 60000);
  
        CacheLoader loader = ((CacheSPI)cache2.getCache()).getCacheLoader();
  
        assertEquals("Loader name for /a/b/1 is Joe", joe.getName(), loader.get(A_B_1_f).get("name"));
        assertEquals("Loader name for /a/b/2 is Jane", jane.getName(), loader.get(A_B_2_f).get("name"));
        assertEquals("Loader name for /a/c/1 is Joe", bob.getName(), loader.get(A_C_1_f).get("name"));
        assertEquals("Loader name for /a/c/2 is Jill", jill.getName(), loader.get(A_C_2_f).get("name"));
  
        Person ab1 = (Person) cache2.find(A_B_1);
        Person ab2 = (Person) cache2.find(A_B_2);
        Person ac1 = (Person) cache2.find(A_C_1);
        Person ac2 = (Person) cache2.find(A_C_2);
        assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
        assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
        assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
        assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
        assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
        assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
        assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
        assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
        assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
     }
  
     public void testPartialStateTransfer() throws Exception
     {
        log.info("Enter testPartialStateTransfer");
  
        PojoCache cache1 = createCache("cache1", false, true, false);
        cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
  
        cache1.attach(A_B_1, joe);
        cache1.attach(A_B_2, jane);
  
        PojoCache cache2 = createCache("cache2", false, true, false);
  
        // Pause to give caches time to see each other
        TestingUtil.blockUntilViewsReceived(new Cache[]
                {cache1.getCache(), cache2.getCache()}, 60000);
  
        assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
        assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
  
        cache2.getCache().getRegion(Fqn.fromString("/a/b"), true).activate();
  
        Person ab1 = (Person) cache2.find(A_B_1);
        Person ab2 = (Person) cache2.find(A_B_2);
        assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
        assertEquals("City for /a/b/1 is Anytown", joe.getAddress().getCity(), ab1.getAddress().getCity());
        assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
        assertEquals("City for /a/b/2 is Anytown", jane.getAddress().getCity(), ab2.getAddress().getCity());
        assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
  
        cache1.attach(A_C_1, bob);
        cache1.attach(A_C_2, jill);
  
        assertNull("/a/c/1 not transferred per policy", cache2.find(A_C_1));
  
        cache2.getCache().getRegion(Fqn.fromString("/a/c"), true).activate();
  
        Person ac1 = (Person) cache2.find(A_C_1);
        Person ac2 = (Person) cache2.find(A_C_2);
        assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
        assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
        assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
        assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
        assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
  
        cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
  
        cache1.getCache().getRegion(Fqn.fromString("/a/b/1"), true).activate();
        cache1.getCache().getRegion(Fqn.fromString("/a/b/2"), true).activate();
        cache1.getCache().getRegion(Fqn.fromString("/a/c/1"), true).activate();
        cache1.getCache().getRegion(Fqn.fromString("/a/c/2"), true).activate();
  
        ab1 = (Person) cache1.find(A_B_1);
        assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
        ab2 = (Person) cache1.find(A_B_2);
        ;
        assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
        assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
        ac1 = (Person) cache1.find(A_C_1);
        assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
        assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
        ac2 = (Person) cache1.find(A_C_2);
        assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
        assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
        assertTrue("Address for Bob and Jill is the same object", ac1.getAddress() == ac2.getAddress());
     }
  
     public void testPartialStateTransferWithLoader() throws Exception
     {
        log.info("Enter testPartialStateTransferWithLoader");
  
        PojoCache cache1 = createCache("cache1", false, true, true);
        cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
  
        cache1.attach(A_B_1, joe);
        cache1.attach(A_B_2, jane);
  
        PojoCache cache2 = createCache("cache2", false, true, true);
  
        // Pause to give caches time to see each other
        TestingUtil.blockUntilViewsReceived(new Cache[]
                {cache1.getCache(), cache2.getCache()}, 60000);
  
        CacheLoader loader = ((CacheSPI)cache2.getCache()).getCacheLoader();
  
        Map map = loader.get(A_B_1_f);
        if (map != null)
        {
           assertNull("/a/b/1 name not transferred per policy", map.get("name"));
           assertNull("/a/b/1 age not transferred per policy", map.get("age"));
        }
        map = loader.get(A_B_2_f);
        if (map != null)
        {
           assertNull("/a/b/1 name not transferred per policy", map.get("name"));
           assertNull("/a/b/1 age not transferred per policy", map.get("age"));
        }
        assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
        assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
  
        cache1.getCache().getRegion(Fqn.fromString("/a/b"), true).activate();
  
        assertEquals("Correct name from loader for /a/b/1", joe.getName(), loader.get(A_B_1_f).get("name"));
        assertEquals("Correct age from loader for /a/b/1", TWENTY, loader.get(A_B_1_f).get("age"));
        assertEquals("Correct name from loader for /a/b/2", jane.getName(), loader.get(A_B_2_f).get("name"));
        assertEquals("Correct age from loader for /a/b/2", TWENTYFIVE, loader.get(A_B_2_f).get("age"));
  
  
        Person ab1 = (Person) cache2.find(A_B_1);
        Person ab2 = (Person) cache2.find(A_B_2);
        assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
        assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
        assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
  
        cache1.attach(A_C_1, bob);
        cache1.attach(A_C_2, jill);
  
        assertNull("/a/c/1 not transferred per policy", cache2.find(A_C_1));
  
        cache1.getCache().getRegion(Fqn.fromString("/a/c"), true).activate();
  
        assertEquals("Correct name from loader for /a/b/1", joe.getName(), loader.get(A_B_1_f).get("name"));
        assertEquals("Correct age from loader for /a/b/1", TWENTY, loader.get(A_B_1_f).get("age"));
        assertEquals("Correct name from loader for /a/b/2", jane.getName(), loader.get(A_B_2_f).get("name"));
        assertEquals("Correct age from loader for /a/b/2", TWENTYFIVE, loader.get(A_B_2_f).get("age"));
        assertEquals("Correct name from loader for /a/c/1", bob.getName(), loader.get(A_C_1_f).get("name"));
        assertEquals("Correct age from loader for /a/c/1", FORTY, loader.get(A_C_1_f).get("age"));
        assertEquals("Correct name from loader for /a/c/2", jill.getName(), loader.get(A_C_2_f).get("name"));
        assertEquals("Correct age from loader for /a/c/2", TWENTYFIVE, loader.get(A_C_2_f).get("age"));
  
        Person ac1 = (Person) cache2.find(A_C_1);
        Person ac2 = (Person) cache2.find(A_C_2);
        assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
        assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
        assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
        assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
        assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
  
        cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
  
        cache1.getCache().getRegion(Fqn.fromString("/a/b/1"), true).activate();
        cache1.getCache().getRegion(Fqn.fromString("/a/b/2"), true).activate();
        cache1.getCache().getRegion(Fqn.fromString("/a/c/1"), true).activate();
        cache1.getCache().getRegion(Fqn.fromString("/a/c/2"), true).activate();
  
        ab1 = (Person) cache1.find(A_B_1);
        assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
        ab2 = (Person) cache1.find(A_B_2);
        ;
        assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
        assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
        assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
        ac1 = (Person) cache1.find(A_C_1);
        assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
        assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
        ac2 = (Person) cache1.find(A_C_2);
        assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
        assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
        assertTrue("Address for Bob and Jill is the same object", ac1.getAddress() == ac2.getAddress());
     }
  
  
     /**
      * Tests concurrent activation of the same subtree by multiple nodes in a
      * REPL_SYNC environment.  The idea is to see what would happen with a
      * farmed deployment. See <code>concurrentActivationTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentActivationSync() throws Exception
     {
        log.info("Enter testConcurrentActivationSync");
  
        concurrentActivationTest(true);
     }
  
     /**
      * Tests concurrent activation of the same subtree by multiple nodes in a
      * REPL_ASYNC environment.  The idea is to see what would happen with a
      * farmed deployment. See <code>concurrentActivationTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentActivationAsync() throws Exception
     {
        log.info("Enter testConcurrentActivationAsync");
  
        concurrentActivationTest(false);
     }
  
     /**
      * Starts 5 caches and then concurrently activates the same region under
      * all 5, causing each to attempt a partial state transfer from the others.
      * As soon as each cache has activated its region, it does a put to a node
      * in the region, thus complicating the lives of the other caches trying
      * to get partial state.
      * <p/>
      * Failure condition is if any node sees an exception or if the final state
      * of all caches is not consistent.
      *
      * @param sync use REPL_SYNC or REPL_ASYNC
      * @throws Exception
      */
     private void concurrentActivationTest(boolean sync) throws Exception
     {
        String[] names = {"A", "B", "C", "D", "E"};
        int count = names.length;
        CacheActivator[] activators = new CacheActivator[count];
  
  
        try
        {
           // Create a semaphore and take all its tickets
           Semaphore semaphore = new Semaphore(count);
           for (int i = 0; i < count; i++)
           {
              semaphore.acquire();
           }
  
           // Create activation threads that will block on the semaphore
           Cache[] caches = new Cache[count];
           for (int i = 0; i < count; i++)
           {
              activators[i] = new CacheActivator(semaphore, names[i], sync);
              caches[i] = activators[i].getCache();
              activators[i].start();
           }
  
           // Make sure everyone is in sync
           TestingUtil.blockUntilViewsReceived(caches, 60000);
  
           // Release the semaphore to allow the threads to start work
           semaphore.release(count);
  
           // Sleep to ensure the threads get all the semaphore tickets
           TestingUtil.sleepThread(1000);
  
           // Reacquire the semaphore tickets; when we have them all
           // we know the threads are done
           for (int i = 0; i < count; i++)
           {
              boolean acquired = semaphore.attempt(60000);
              if (!acquired)
                 fail("failed to acquire semaphore " + i);
           }
  
           // Sleep to allow any async calls to clear
           if (!sync)
              TestingUtil.sleepThread(500);
  
           // Ensure the caches held by the activators see all the values
           for (int i = 0; i < count; i++)
           {
              assertNull("Activator " + names[i] + " caught an exception",
                      activators[i].getException());
  
              for (int j = 0; j < count; j++)
              {
                 String fqn = "/a/b/" + names[j];
                 Person p = (Person) activators[i].getCacheValue(fqn);
                 assertNotNull(names[i] + ":" + fqn + " is not null", p);
                 assertEquals("Correct name for " + names[i] + ":" + fqn,
                         "Person " + names[j], p.getName());
                 assertEquals("Correct street for " + names[i] + ":" + fqn,
                         names[j] + " Test Street", p.getAddress().getStreet());
  //               System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
              }
  
           }
        }
        catch (Exception ex)
        {
           fail(ex.getLocalizedMessage());
        }
        finally
        {
           for (int i = 0; i < count; i++)
              activators[i].cleanup();
        }
  
     }
  
     /**
      * Tests partial state transfer under heavy concurrent load and REPL_SYNC.
      * See <code>concurrentUseTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentUseSync() throws Exception
     {
        log.info("Enter testConcurrentUseSync");
  
  //      concurrentUseTest(true);
     }
  
     /**
      * Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
      * See <code>concurrentUseTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentUseAsync() throws Exception
     {
        log.info("Enter testConcurrentUseAsync");
  
  //      concurrentUseTest(false);
     }
  
     /**
      * Initiates 5 caches, 4 with active trees and one with an inactive tree.
      * Each of the active caches begins rapidly generating puts against nodes
      * in a subtree for which it is responsible. The 5th cache activates
      * each subtree, and at the end confirms no node saw any exceptions and
      * that each node has consistent state.
      *
      * @param sync whether to use REPL_SYNC or REPL_ASYNCE
      * @throws Exception
      */
     private void XconcurrentUseTest(boolean sync) throws Exception
     {
        String[] names = {"B", "C", "D", "E"};
        int count = names.length;
        CacheStressor[] stressors = new CacheStressor[count];
  
        try
        {
  
           PojoCache cacheA = createCache("cacheA", sync, true, false, false);
  
           Cache[] caches = new Cache[count + 1];
           caches[0] = cacheA.getCache();
  
           // Create a semaphore and take all its tickets
           Semaphore semaphore = new Semaphore(count);
           for (int i = 0; i < count; i++)
           {
              semaphore.acquire();
           }
  
           // Create stressor threads that will block on the semaphore
  
           for (int i = 0; i < count; i++)
           {
              stressors[i] = new CacheStressor(semaphore, names[i], sync);
              caches[i + 1] = stressors[i].getCache();
              stressors[i].start();
              // Give each one a chance to stabilize
              TestingUtil.sleepThread(100);
           }
  
           // Make sure everyone's views are in sync
           TestingUtil.blockUntilViewsReceived(caches, 60000);
  
           // Repeat the basic test two times in order to involve inactivation
           for (int x = 0; x < 2; x++)
           {
  //            if (x > 0)
  //            {
              // Reset things by inactivating the region
              // and enabling the stressors
              for (int i = 0; i < count; i++)
              {
                 cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).deactivate();
                 log.info("TEST: Run " + x + "-- /" + names[i] + " inactivated on A");
                 stressors[i].startPuts();
              }
  //            }
  
              // Release the semaphore to allow the threads to start work
              semaphore.release(count);
  
              // Sleep to ensure the threads get all the semaphore tickets
              // and to ensure puts are actively in progress
              TestingUtil.sleepThread(300);
  
              // Activate cacheA
              for (int i = 0; i < count; i++)
              {
                 log.info("TEST: Activating /" + names[i] + " on A");
                 cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).activate();
                 // Stop the stressor so we don't pollute cacheA's state
                 // with too many messages sent after activation -- we want
                 // to compare transferred state with the sender
                 stressors[i].stopPuts();
                 log.info("TEST: Run " + x + "-- /" + names[i] + " activated on A");
                 // Reacquire one semaphore ticket
                 boolean acquired = semaphore.attempt(60000);
                 if (!acquired)
                    fail("failed to acquire semaphore " + names[i]);
                 log.info("TEST: Run " + x + "-- acquired semaphore from " + names[i]);
  
                 // Pause to allow other work to proceed
                 TestingUtil.sleepThread(100);
              }
  
              // Sleep to allow any in transit msgs to clear
              if (!sync)
                 TestingUtil.sleepThread(2000);
  
              // Ensure the stressors saw no exceptions
              for (int i = 0; i < count; i++)
              {
                 Exception e = stressors[i].getException();
                 if (e != null)
                 {
                    log.error("Stressor " + names[i] + " caught an exception",
                            e);
                    throw e;
                 }
              }
  
  //            log.info("Cache A details:\n" + cacheA.printDetails());
  
              // Compare cache contents
              Person p1 = null;
              Person p2 = null;
              for (int i = 0; i < count; i++)
              {
  //               log.info("Cache " + names[i] + " details:\n" +
  //                       stressors[i].getTreeCache().printDetails());
  
                 for (int j = 0; j < SUBTREE_SIZE; j++)
                 {
  
                    String fqn = "/" + names[i] + "/" + j;
                    log.info("TEST: Getting A:" + fqn);
                    p1 = (Person) cacheA.find(fqn);
                    boolean p1Null = p1 == null;
                    log.info("TEST: Getting " + names[i] + ":" + fqn);
  //                  p2 = (Person) stressors[i].getCache().find(fqn);
                    boolean p2Null = p2 == null;
                    assertEquals("Run " + x + ": " + fqn +
                            " null status matches", p1Null, p2Null);
                    if (!p1Null)
                    {
                       assertEquals("Run " + x + ": A:" + fqn + " age matches " + names[i] + ":" + fqn,
                               p1.getAge(), p2.getAge());
                       assertEquals("Run " + x + ": A:" + fqn + " name matches " + names[i] + ":" + fqn,
                               p1.getName(), p2.getName());
                       assertEquals("Run " + x + ": A:" + fqn + " address matches " + names[i] + ":" + fqn,
                               p1.getAddress().getStreet(),
                               p2.getAddress().getStreet());
                    }
                 }
              }
           }
  
           for (int i = 0; i < count; i++)
              stressors[i].stopThread();
  
        }
        finally
        {
           for (int i = 0; i < count; i++)
           {
              if (stressors[i] != null)
                 stressors[i].cleanup();
           }
        }
  
     }
  
     protected PojoCache createCache(String cacheID, boolean sync, boolean useMarshalling, boolean useCacheLoader)
             throws Exception
     {
        return createCache(cacheID, sync, useMarshalling, useCacheLoader, true);
     }
  
     protected PojoCache createCache(String cacheID, boolean sync,
                                     boolean useMarshalling,
                                     boolean useCacheLoader,
                                     boolean inactiveOnStartup)
             throws Exception
     {
        if (caches.get(cacheID) != null)
           throw new IllegalStateException(cacheID + " already created");
  
        XmlConfigurationParser parser = new XmlConfigurationParser();
        Configuration c = parser.parseFile(sync ? "META-INF/replSync-service.xml" : "META-INF/replAsync-service.xml");
        c.setClusterName("StateTransferTestBase");
        c.setReplVersionString(getReplicationVersion());
        // Use a long timeout to facilitate setting debugger breakpoints
        c.setInitialStateRetrievalTimeout(60000);
        if (useMarshalling)
        {
           c.setUseRegionBasedMarshalling(true);
           c.setInactiveOnStartup(inactiveOnStartup);
        }
        if (useCacheLoader)
        {
           configureCacheLoader(c, cacheID, useMarshalling);
        }
  
        PojoCache cache = PojoCacheFactory.createInstance(c, true);
        // Put the cache in the map before starting, so if it fails in
        // start it can still be destroyed later
        caches.put(cacheID, cache);
  
        return cache;
     }
  
     protected void configureCacheLoader(Configuration c, String cacheID, boolean useExtended) throws Exception
     {
        String tmp_location = getTempLocation(cacheID);
  
        // Do cleanup in case it failed before
        File file = new File(tmp_location);
        cleanFile(file);
  
        file.mkdir();
  
        tmp_location = escapeWindowsPath(tmp_location);
  
        if (useExtended)
           c.setCacheLoaderConfiguration(getCacheLoaderConfig("org.jboss.cache.loader.FileExtendedCacheLoader", tmp_location));
        else
           c.setCacheLoaderConfiguration(getCacheLoaderConfig("org.jboss.cache.loader.FileCacheLoader", tmp_location));
     }
  
  
     protected Element getCacheLoaderConfig(String cl, String loc) throws Exception
     {
        String xml = "            <config>\n" +
                "                \n" +
                "                <passivation>false</passivation>\n" +
                "                <preload></preload>\n" +
                "\n" +
                "                <cacheloader>\n" +
                "                    <class>" + cl + "</class>\n" +
                "                    <properties>\n" +
                "                        location=" + loc + "\n" +
                "                    </properties>\n" +
                "                    <async>false</async>\n" +
                "                    <fetchPersistentState>true</fetchPersistentState>\n" +
                "                    <ignoreModifications>false</ignoreModifications>\n" +
                "                </cacheloader>\n" +
                "                \n" +
                "            </config>";
        return XmlHelper.stringToElement(xml);
     }
  
     protected String getTempLocation(String cacheID)
     {
        String tmp_location = System.getProperty("java.io.tmpdir", "c:\\tmp");
        File file = new File(tmp_location);
        file = new File(file, cacheID);
        return file.getAbsolutePath();
     }
  
     protected String escapeWindowsPath(String path)
     {
        if ('/' == File.separatorChar)
           return path;
  
        char[] chars = path.toCharArray();
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < chars.length; i++)
        {
           if (chars[i] == '\\')
              sb.append('\\');
           sb.append(chars[i]);
        }
        return sb.toString();
     }
  
     protected abstract String getReplicationVersion();
  
     protected void setUp() throws Exception
     {
        super.setUp();
  
        caches = new HashMap();
  
        addr1 = new Address();
        addr1.setStreet("101 Oakview Dr");
        addr1.setCity("Anytown");
        addr1.setZip(11111);
  
        addr2 = new Address();
        addr2.setStreet("222 Happy Dr");
        addr2.setCity("Fremont");
        addr2.setZip(22222);
  
        joe = new Person();
        joe.setName("Joe");
        joe.setAge(TWENTY.intValue());
        joe.setAddress(addr1);
        Set skills = new HashSet();
        skills.add("TENNIS");
        skills.add("CARPENTRY");
        joe.setSkills(skills);
  
        jane = new Person();
        jane.setName("Jane");
        jane.setAge(TWENTYFIVE.intValue());
        jane.setAddress(addr1);
        skills = new HashSet();
        skills.add("JUJITSU");
        skills.add("MACRAME");
        jane.setSkills(skills);
  
        bob = new Person();
        bob.setName("Bob");
        bob.setAge(FORTY.intValue());
        bob.setAddress(addr2);
        skills = new HashSet();
        skills.add("LANGUAGES");
        skills.add("LAWN BOWLING");
        bob.setSkills(skills);
  
        jill = new Person();
        jill.setName("Jill");
        jill.setAge(TWENTYFIVE.intValue());
        jill.setAddress(addr2);
        skills = new HashSet();
        skills.add("FORTRAN");
        skills.add("COBOL");
        jane.setSkills(skills);
     }
  
     protected void tearDown() throws Exception
     {
        super.tearDown();
  
        Set keys = caches.keySet();
        String[] cacheIDs = new String[keys.size()];
        cacheIDs = (String[]) keys.toArray(cacheIDs);
        for (int i = 0; i < cacheIDs.length; i++)
        {
           stopCache((PojoCache) caches.get(cacheIDs[i]));
           File file = new File(getTempLocation(cacheIDs[i]));
           cleanFile(file);
        }
     }
  
     protected void stopCache(PojoCache cache)
     {
        if (cache != null)
        {
           try
           {
              cache.stop();
              cache.destroy();
           }
           catch (Exception e)
           {
              log.error("Exception stopping cache " + e.getMessage(), e);
           }
        }
     }
  
     protected void cleanFile(File file)
     {
        File[] children = file.listFiles();
        if (children != null)
        {
           for (int i = 0; i < children.length; i++)
           {
              cleanFile(children[i]);
           }
        }
  
        if (file.exists())
           file.delete();
        if (file.exists())
           file.deleteOnExit();
     }
  
     private class CacheActivator extends CacheUser
     {
  
        CacheActivator(Semaphore semaphore,
                       String name,
                       boolean sync)
                throws Exception
        {
           super(semaphore, name, sync, false);
        }
  
        void useCache() throws Exception
        {
           cache.getCache().getRegion(Fqn.fromString("/a/b"), true).activate();
           log.info("TEST: " + name + " activated region" + " " + System.currentTimeMillis());
           String childFqn = "/a/b/" + name;
  
           Person p = new Person();
           p.setName("Person " + name);
  
           Address addr = new Address();
           addr.setStreet(name + " Test Street");
           addr.setCity(name + ", CA");
           p.setAddress(addr);
  
           TestingUtil.sleepThread(1);
  
  //         tm.begin();
  //         try
  //         {
           cache.attach(childFqn, p);
           log.info("TEST: " + name + " put fqn " + childFqn + " " + System.currentTimeMillis());
  //         }
  //         catch (Exception e)
  //         {
  //            tm.setRollbackOnly();
  //            throw e;
  //         }
  //         finally
  //         {
  //            tm.commit();
  //         }
  
        }
  
        public Object getCacheValue(String fqn) throws CacheException
        {
           return cache.find(fqn);
        }
     }
  
     private class CacheStressor extends CacheUser
     {
        private Random random;
        private boolean putsStopped = false;
        private boolean stopped = false;
  
        CacheStressor(Semaphore semaphore,
                      String name,
                      boolean sync)
                throws Exception
        {
           super(semaphore, name, sync, true);
  
           random = new Random(System.currentTimeMillis() + name.hashCode());
        }
  
        void useCache() throws Exception
        {
           // Do lots of puts into the cache.  Use our own nodes,
           // as we're not testing conflicts between writer nodes,
           // just whether activation causes problems
           int factor = 0;
           int i = 0;
           String fqn = null;
  
           Address addr1 = new Address();
           addr1.setStreet("1 Test Street");
           addr1.setCity("TestOne, CA");
  
           Address addr2 = new Address();
           addr2.setStreet("2 Test Street");
           addr2.setCity("TestTwo, CA");
  
           Person[] people = new Person[SUBTREE_SIZE];
           boolean[] loaded = new boolean[SUBTREE_SIZE];
           for (int j = 0; j < SUBTREE_SIZE; j++)
           {
              Person p = new Person();
              p.setName("Person " + j);
              p.setAge(j);
              p.setAddress((j % 2 == 0) ? addr1 : addr2);
              people[j] = p;
           }
  
           boolean acquired = true;
           try
           {
              while (!stopped)
              {
                 if (i > 0)
                 {
                    acquired = semaphore.attempt(60000);
                    if (!acquired)
                       throw new Exception(name + " cannot acquire semaphore");
                    log.info("TEST: " + name + " reacquired semaphore");
                    System.out.println("TEST: " + name + " reacquired semaphore");
                 }
  
                 int lastIndex = -1;
                 int index = -1;
                 while (!putsStopped)
                 {
                    // Ensure we don't operate on the same address twice in a row
                    // otherwise deadlock detection sometimes causes
                    // the _put for the second call to precede the commit
                    // for the first, leading to deadlock.  This seems like a
                    // JGroups bug, but the purpose of this test isn't to expose it
                    while (index % 2 == lastIndex % 2)
                    {
                       factor = random.nextInt(50);
                       index = factor % SUBTREE_SIZE;
                    }
  
                    lastIndex = index;
  
                    TestingUtil.sleepThread(factor);
  
                    fqn = "/" + name + "/" + String.valueOf(index);
  
  //                  tm.begin();
  //                  try
  //                  {
                    if (loaded[index] == false)
                    {
                       cache.attach(fqn, people[index]);
                       loaded[index] = true;
                       log.info("TEST: " + name + " put Person at " + fqn);
                    }
                    else if (i % 2 == 0)
                    {
                       int newAge = factor / SUBTREE_SIZE;
                       people[index].setAge(newAge);
                    }
                    else
                    {
                       people[index].getAddress().setStreet(factor + " Test Street");
                    }
  //                  }
  //                  catch (Exception e)
  //                  {
  //                     tm.setRollbackOnly();
  //                     throw e;
  //                  }
  //                  finally
  //                  {
  //                     tm.commit();
  //                  }
  
                    i++;
                 }
  
                 log.info("TEST: " + name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
  
                 semaphore.release();
                 acquired = false;
  
                 // Go to sleep until directed otherwise
                 while (!stopped && putsStopped)
                    TestingUtil.sleepThread(100);
              }
           }
           finally
           {
              if (acquired)
                 semaphore.release();
           }
        }
  
  //      public void start() throws Exception
  //      {
  //         super.start();
  //         cache.activateRegion("/" + name);
  //      }
  
        public void stopPuts()
        {
           putsStopped = true;
           log.info("TEST: " + name + " putsStopped");
        }
  
        public void startPuts()
        {
           putsStopped = false;
        }
  
        public void stopThread()
        {
           stopped = true;
           if (thread.isAlive())
              thread.interrupt();
        }
  
  
     }
  
     private abstract class CacheUser implements Runnable
     {
        protected Semaphore semaphore;
        protected PojoCache cache;
        protected TransactionManager tm;
        protected String name;
        protected Exception exception;
        protected Thread thread;
  
        CacheUser(Semaphore semaphore,
                  String name,
                  boolean sync,
                  boolean activateRoot)
                throws Exception
        {
           this.cache = createCache(name, sync, true, false, !activateRoot);
           tm = ((CacheSPI)cache.getCache()).getTransactionManager();
           if (tm == null)
              throw new IllegalStateException("TransactionManager required");
           this.semaphore = semaphore;
           this.name = name;
  
           log.info("TEST: Cache " + name + " started");
           System.out.println("TEST: Cache " + name + " started");
        }
  
        public void run()
        {
           log.info("TEST: " + name + " started");
           System.out.println("TEST: " + name + " started");
  
           boolean acquired = false;
           try
           {
              acquired = semaphore.attempt(60000);
              if (!acquired)
                 throw new Exception(name + " cannot acquire semaphore");
              log.info("TEST: " + name + " acquired semaphore");
              System.out.println("TEST: " + name + " acquired semaphore");
              useCache();
  
           }
           catch (Exception e)
           {
              log.error("TEST: " + name + ": " + e.getLocalizedMessage(), e);
  
              // Save it for the test to check
              exception = e;
           }
           finally
           {
              if (acquired)
                 semaphore.release();
           }
  
        }
  
        abstract void useCache() throws Exception;
  
        public Exception getException()
        {
           return exception;
        }
  
        public Cache getCache()
        {
           return cache.getCache();
        }
  
        public void start() throws Exception
        {
           thread = new Thread(this);
           thread.start();
        }
  
        public void cleanup()
        {
           if (thread != null && thread.isAlive())
              thread.interrupt();
        }
     }
  }
  
  
  
  1.1      date: 2006/10/10 13:35:47;  author: bwang;  state: Exp;JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer/StateTransfer200AopTest.java
  
  Index: StateTransfer200AopTest.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.cache.pojo.statetransfer;
  
  /**
   * Tests that PojoCache state transfer works properly if the version is 2.0.0.
   *
   * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
   * @version $Revision: 1.1 $
   */
  public class StateTransfer200AopTest extends StateTransferAopTestBase
  {
  
     protected String getReplicationVersion()
     {
        return "2.0.0.GA";
     }
  
  }
  
  
  
  1.1      date: 2006/10/10 13:35:47;  author: bwang;  state: Exp;JBossCache/tests-50/functional/org/jboss/cache/pojo/statetransfer/ReplicatedTest.java
  
  Index: ReplicatedTest.java
  ===================================================================
  /*
   * JBoss, Home of Professional Open Source.
   * Copyright 2006, Red Hat Middleware LLC, and individual contributors
   * as indicated by the @author tags. See the copyright.txt file in the
   * distribution for a full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
   * the License, or (at your option) any later version.
   *
   * This software is distributed in the hope that it will be useful,
   * but WITHOUT ANY WARRANTY; without even the implied warranty of
   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   * Lesser General Public License for more details.
   *
   * You should have received a copy of the GNU Lesser General Public
   * License along with this software; if not, write to the Free
   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   */
  
  package org.jboss.cache.pojo.statetransfer;
  
  import junit.framework.TestCase;
  import junit.framework.Test;
  import junit.framework.TestSuite;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jboss.cache.pojo.PojoCache;
  import org.jboss.cache.pojo.PojoCacheFactory;
  import org.jboss.cache.pojo.test.Person;
  import org.jboss.cache.pojo.test.Student;
  
  import java.util.List;
  
  /**
   * Simple replicated test for state transfer
   * @author Ben Wang
   */
  public class ReplicatedTest extends TestCase
  {
     Log log = LogFactory.getLog(ReplicatedTest.class);
     PojoCache cache, cache1;
  
  
     public ReplicatedTest(String name)
     {
        super(name);
     }
  
     protected void setUp() throws Exception
     {
        super.setUp();
        log.info("setUp() ....");
     }
  
     protected void tearDown() throws Exception
     {
        super.tearDown();
        cache.stop();
        cache1.stop();
     }
  
  //   public void testDummy() {}
  
     private Person createPerson(String id, String name, int age)
     {
        Person p = new Person();
        p.setName(name);
        p.setAge(age);
        cache.attach(id, p);
        return p;
     }
  
     private Student createStudent(String id, String name, int age, String grade)
     {
        Student p = new Student();
        p.setName(name);
        p.setAge(age);
        p.setYear(grade);
        cache.attach(id, p);
        return p;
     }
  
     public void testSimple() throws Exception
     {
        String configFile = "META-INF/replSync-service.xml";
        boolean toStart = true;
        cache = PojoCacheFactory.createInstance(configFile, toStart);
        Person ben = createPerson("/person/test1", "Ben Wang", 40);
  
        System.out.println("\n*** I ***");
        System.out.println(((org.jboss.cache.TreeCacheProxyImpl)cache.getCache()).printDetails());
        cache1 = PojoCacheFactory.createInstance(configFile, toStart);
        cache1.start();
  
        System.out.println("\n*** II ***");
        System.out.println(((org.jboss.cache.TreeCacheProxyImpl)cache1.getCache()).printDetails());
  
        Person p = (Person)cache1.find("/person/test1");
  
        log.info("testSimple() ....");
        assertEquals("Ben Wang", ben.getName());
        assertEquals("Ben Wang", ((Person) cache1.find("/person/test1")).getName());
        cache.detach("/person/test1");
     }
  
     public static Test suite() throws Exception
     {
        return new TestSuite(ReplicatedTest.class);
     }
  
  
     public static void main(String[] args) throws Exception
     {
        junit.textui.TestRunner.run(suite());
     }
  
  }
  
  
  



More information about the jboss-cvs-commits mailing list