[jboss-cvs] JBossCache/tests/functional/org/jboss/cache/statetransfer ...

Vladmir Blagojevic vladimir.blagojevic at jboss.com
Sat Jul 21 13:00:42 EDT 2007


  User: vblagojevic
  Date: 07/07/21 13:00:42

  Modified:    tests/functional/org/jboss/cache/statetransfer    
                        StateTransferTestBase.java
                        StateTransfer200Test.java
  Added:       tests/functional/org/jboss/cache/statetransfer    
                        StateTransferConcurrencyTest.java
  Removed:     tests/functional/org/jboss/cache/statetransfer    
                        VersionedTestBase.java
  Log:
  [JBCACHE-983] - State transfer test failures ( to do with concurrent eviction and activation )
  
  Revision  Changes    Path
  1.40      +21 -1     JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransferTestBase.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransferTestBase.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransferTestBase.java,v
  retrieving revision 1.39
  retrieving revision 1.40
  diff -u -b -r1.39 -r1.40
  --- StateTransferTestBase.java	14 Jun 2007 15:30:14 -0000	1.39
  +++ StateTransferTestBase.java	21 Jul 2007 17:00:42 -0000	1.40
  @@ -26,6 +26,7 @@
   import org.jboss.cache.CacheSPI;
   import org.jboss.cache.DefaultCacheFactory;
   import org.jboss.cache.Fqn;
  +import org.jboss.cache.Region;
   import org.jboss.cache.config.Configuration;
   import org.jboss.cache.config.Configuration.CacheMode;
   import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
  @@ -49,9 +50,21 @@
    */
   public abstract class StateTransferTestBase extends AbstractCacheLoaderTestBase
   {
  +
  +   	
  +   protected static final int SUBTREE_SIZE = 10;
  +
  +   public static final Fqn A = Fqn.fromString("/a");
  +   public static final Fqn B = Fqn.fromString("/b");
  +   public static final Fqn C = Fqn.fromString("/c");
  +
  +   protected static final String ADDRESS_CLASSNAME = "org.jboss.cache.marshall.data.Address";
  +   protected static final String PERSON_CLASSNAME = "org.jboss.cache.marshall.data.Person";
  +
      public static final Fqn A_B = Fqn.fromString("/a/b");
      public static final Fqn A_C = Fqn.fromString("/a/c");
      public static final Fqn A_D = Fqn.fromString("/a/d");
  +   
      public static final String JOE = "JOE";
      public static final String BOB = "BOB";
      public static final String JANE = "JANE";
  @@ -140,6 +153,13 @@
         return tree;
      }
   
  +   protected void createAndActivateRegion(CacheSPI c, Fqn f)
  +   {
  +      Region r = c.getRegion(f, true);
  +      r.registerContextClassLoader(getClass().getClassLoader());
  +      r.activate();
  +   }
  +
      /**
       * Provides a hook for multiplexer integration. This default implementation
       * is a no-op; subclasses that test mux integration would override
  
  
  
  1.21      +308 -2    JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransfer200Test.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: StateTransfer200Test.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransfer200Test.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -b -r1.20 -r1.21
  --- StateTransfer200Test.java	18 Jul 2007 20:35:00 -0000	1.20
  +++ StateTransfer200Test.java	21 Jul 2007 17:00:42 -0000	1.21
  @@ -7,12 +7,16 @@
   
   package org.jboss.cache.statetransfer;
   
  +import java.lang.reflect.Method;
  +
   import org.jboss.cache.CacheImpl;
   import org.jboss.cache.CacheSPI;
   import org.jboss.cache.Fqn;
  +import org.jboss.cache.Region;
   import org.jboss.cache.buddyreplication.BuddyManager;
   import org.jboss.cache.config.BuddyReplicationConfig;
   import org.jboss.cache.factories.XmlConfigurationParser;
  +import org.jboss.cache.loader.CacheLoader;
   import org.jboss.cache.misc.TestingUtil;
   import org.w3c.dom.Document;
   import org.w3c.dom.Element;
  @@ -24,9 +28,9 @@
    * Tests that state transfer works properly if the version is 2.0.0.GA.
    *
    * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
  - * @version $Revision: 1.20 $
  + * @version $Revision: 1.21 $
    */
  -public class StateTransfer200Test extends VersionedTestBase
  +public class StateTransfer200Test extends StateTransferTestBase
   {
   
      protected String getReplicationVersion()
  @@ -114,6 +118,308 @@
         assertFalse("/a/b is not in cache loader ", cache2.getCacheLoaderManager().getCacheLoader().exists(A_B));
      }
   
  +   public void testLoadEntireStateAfterStart() throws Exception
  +   {
  +      CacheSPI cache1 = createCache("cache1", false, true, true);
  +
  +      createAndActivateRegion(cache1, Fqn.ROOT);
  +
  +      cache1.put(A_B, "name", JOE);
  +      cache1.put(A_B, "age", TWENTY);
  +      cache1.put(A_C, "name", BOB);
  +      cache1.put(A_C, "age", FORTY);
  +
  +      CacheSPI cache2 = createCache("cache2", false, true, true);
  +
  +      // Pause to give caches time to see each other
  +      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
  +
  +      CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
  +
  +      assertNull("/a/b transferred to loader against policy", loader.get(A_B));
  +
  +      assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
  +      assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
  +      assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
  +      assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
  +
  +      createAndActivateRegion(cache2, Fqn.ROOT);
  +
  +      assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
  +      assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
  +      assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
  +      assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
  +   }
  +
  +   
  +   public void testInitialStateTransfer() throws Exception
  +   {
  +      CacheSPI cache1 = createCache("cache1", false, false, false);
  +
  +      cache1.put(A_B, "name", JOE);
  +      cache1.put(A_B, "age", TWENTY);
  +      cache1.put(A_C, "name", BOB);
  +      cache1.put(A_C, "age", FORTY);
  +
  +      CacheSPI cache2 = createCache("cache2", false, false, false);
  +
  +      // Pause to give caches time to see each other
  +      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
  +   }
  +   
  +   public void testInitialStateTferWithLoader() throws Exception
  +   {
  +      initialStateTferWithLoaderTest(false);
  +   }
  +
  +   public void testInitialStateTferWithAsyncLoader() throws Exception
  +   {
  +      initialStateTferWithLoaderTest(true);
  +   }
  +
  +   protected void initialStateTferWithLoaderTest(boolean asyncLoader) throws Exception
  +   {
  +      initialStateTferWithLoaderTest("org.jboss.cache.loader.FileCacheLoader",
  +              "org.jboss.cache.loader.FileCacheLoader", asyncLoader);
  +   }
  +
  +   public void testPartialStateTransfer() throws Exception
  +   {
  +      CacheSPI cache1 = createCache("cache1", false, true, false);
  +
  +      createAndActivateRegion(cache1, A);
  +
  +      cache1.put(A_B, "name", JOE);
  +      cache1.put(A_B, "age", TWENTY);
  +      cache1.put(A_C, "name", BOB);
  +      cache1.put(A_C, "age", FORTY);
  +
  +      CacheSPI cache2 = createCache("cache2", false, true, false);
  +
  +      // Pause to give caches time to see each other
  +      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
  +
  +      assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
  +      assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
  +      assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
  +      assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
  +
  +      createAndActivateRegion(cache2, A_B);
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
  +      assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
  +
  +      cache1.put(A_D, "name", JANE);
  +
  +      assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
  +
  +      createAndActivateRegion(cache2, A_C);
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
  +      assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
  +
  +      createAndActivateRegion(cache2, A_D);
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
  +      assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
  +
  +
  +      cache1.getRegion(A, false).deactivate();
  +      createAndActivateRegion(cache1, A_B);
  +      createAndActivateRegion(cache1, A_C);
  +      createAndActivateRegion(cache1, A_D);
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
  +      assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
  +
  +   }
  +   
  +   public void testPartialStateTferWithLoader() throws Exception
  +   {
  +      CacheSPI cache1 = createCache("cache1", false, true, true);
  +
  +      createAndActivateRegion(cache1, A);
  +
  +      cache1.put(A_B, "name", JOE);
  +      cache1.put(A_B, "age", TWENTY);
  +      cache1.put(A_C, "name", BOB);
  +      cache1.put(A_C, "age", FORTY);
  +
  +      CacheSPI cache2 = createCache("cache2", false, true, true);
  +
  +      // Pause to give caches time to see each other
  +      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
  +
  +      CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
  +
  +      assertNull("/a/b transferred to loader against policy", loader.get(A_B));
  +
  +      assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
  +      assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
  +      assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
  +      assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
  +
  +      createAndActivateRegion(cache2, A_B);
  +
  +      assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
  +      assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
  +      assertNull("/a/c transferred to loader against policy", loader.get(A_C));
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
  +      assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
  +
  +      cache1.put(A_D, "name", JANE);
  +
  +      assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
  +
  +      createAndActivateRegion(cache2, A_C);
  +
  +      assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
  +      assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
  +      assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
  +      assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
  +      assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
  +
  +      createAndActivateRegion(cache2, A_D);
  +
  +      assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
  +      assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
  +      assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
  +      assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
  +      assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
  +      assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
  +
  +      cache1.getRegion(A, false).deactivate();
  +
  +      createAndActivateRegion(cache1, A_B);
  +      createAndActivateRegion(cache1, A_C);
  +      createAndActivateRegion(cache1, A_D);
  +
  +      loader = cache1.getCacheLoaderManager().getCacheLoader();
  +
  +      assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
  +      assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
  +      assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
  +      assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
  +      assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
  +
  +      assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
  +      assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
  +      assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
  +      assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
  +      assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
  +   }
  +
  +   public void testPartialStateTferWithClassLoader() throws Exception
  +   {
  +      // FIXME: This test is meaningless because MarshalledValueInputStream
  +      // will find the classes w/ their own loader if TCL can't.  Need
  +      // to find a way to test!
  +      // But, at least it tests JBCACHE-305 by registering a classloader
  +      // both before and after start()
  +
  +      // Set the TCL to a classloader that can't see Person/Address
  +      Thread.currentThread().setContextClassLoader(getNotFoundClassLoader());
  +
  +      CacheSPI cache1 = createCache("cache1",
  +              false, // async
  +              true, // use marshaller
  +              true, // use cacheloader
  +              false, false);// don't start
  +      ClassLoader cl1 = getClassLoader();
  +      cache1.getRegion(A, true).registerContextClassLoader(cl1);
  +      startCache(cache1);
  +
  +      cache1.getRegion(A, true).activate();
  +
  +      Object ben = createBen(cl1);
  +
  +      cache1.put(A_B, "person", ben);
  +
  +      // For cache 2 we won't register loader until later
  +      CacheSPI cache2 = createCache("cache2",
  +              false, // async
  +              true, // use marshalling
  +              true, // use cacheloader
  +              false, true);// start
  +
  +      // Pause to give caches time to see each other
  +      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
  +
  +      CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
  +
  +      assertNull("/a/b not transferred to loader", loader.get(A_B));
  +
  +      assertNull("/a/b not transferred to cache", cache2.get(A_B, "person"));
  +
  +      ClassLoader cl2 = getClassLoader();
  +
  +      //      cache2.registerClassLoader(A, cl2);
  +      Region r = cache2.getRegion(A, true);
  +      r.registerContextClassLoader(cl2);
  +
  +      r.activate();
  +
  +      assertEquals("Correct state from loader for /a/b", ben.toString(), loader.get(A_B).get("person").toString());
  +
  +      assertEquals("Correct state from cache for /a/b", ben.toString(), cache2.get(A_B, "person").toString());
  +
  +   }
  +   
  +   private Object createBen(ClassLoader loader) throws Exception
  +   {
  +      Class addrClazz = loader.loadClass(ADDRESS_CLASSNAME);
  +      Method setCity = addrClazz.getMethod("setCity", new Class[]{String.class});
  +      Method setStreet = addrClazz.getMethod("setStreet", new Class[]{String.class});
  +      Method setZip = addrClazz.getMethod("setZip", new Class[]{int.class});
  +      Object addr = addrClazz.newInstance();
  +      setCity.invoke(addr, new Object[]{"San Jose"});
  +      setStreet.invoke(addr, new Object[]{"1007 Home"});
  +      setZip.invoke(addr, new Object[]{90210});
  +
  +      Class benClazz = loader.loadClass(PERSON_CLASSNAME);
  +      Method setName = benClazz.getMethod("setName", new Class[]{String.class});
  +      Method setAddress = benClazz.getMethod("setAddress", new Class[]{addrClazz});
  +      Object ben = benClazz.newInstance();
  +      setName.invoke(ben, new Object[]{"Ben"});
  +      setAddress.invoke(ben, new Object[]{addr});
  +
  +      return ben;
  +   }
  +
   
      private BuddyReplicationConfig getBuddyConfig() throws Exception
      {
  
  
  
  1.1      date: 2007/07/21 17:00:42;  author: vblagojevic;  state: Exp;JBossCache/tests/functional/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
  
  Index: StateTransferConcurrencyTest.java
  ===================================================================
  /*
   * JBoss, the OpenSource J2EE webOS
   *
   * Distributable under LGPL license.
   * See terms of license at gnu.org.
   */
  
  package org.jboss.cache.statetransfer;
  
  import org.jboss.cache.Cache;
  import org.jboss.cache.CacheException;
  import org.jboss.cache.CacheSPI;
  import org.jboss.cache.DefaultCacheFactory;
  import org.jboss.cache.Fqn;
  import org.jboss.cache.Node;
  import org.jboss.cache.Region;
  import org.jboss.cache.config.Configuration;
  import org.jboss.cache.config.Configuration.CacheMode;
  import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
  import org.jboss.cache.marshall.InactiveRegionException;
  import org.jboss.cache.misc.TestingUtil;
  
  import java.util.Random;
  import java.util.Set;
  import java.util.concurrent.Semaphore;
  import java.util.concurrent.TimeUnit;
  
  /**
   * Abstract superclass of "StateTransferVersion"-specific tests
   * of CacheSPI's state transfer capability.
   * <p/>
   * TODO add tests with classloader regions
   *
   * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
   * @version $Id: StateTransferConcurrencyTest.java,v 1.1 2007/07/21 17:00:42 vblagojevic Exp $
   */
  public class StateTransferConcurrencyTest extends StateTransferTestBase
  {
     
     protected String getReplicationVersion()
     {
        return "2.0.0.GA";
     }
  
     /**
      * Tests concurrent activation of the same subtree by multiple nodes in a
      * REPL_SYNC environment.  The idea is to see what would happen with a
      * farmed deployment. See <code>concurrentActivationTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentActivationSync() throws Exception
     {
        concurrentActivationTest(true);
     }
  
     /**
      * Tests concurrent activation of the same subtree by multiple nodes in a
      * REPL_ASYNC environment.  The idea is to see what would happen with a
      * farmed deployment. See <code>concurrentActivationTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentActivationAsync() throws Exception
     {
        concurrentActivationTest(false);
     }
  
     /**
      * Starts 5 caches and then concurrently activates the same region under
      * all 5, causing each to attempt a partial state transfer from the others.
      * As soon as each cache has activated its region, it does a put to a node
      * in the region, thus complicating the lives of the other caches trying
      * to get partial state.
      * <p/>
      * Failure condition is if any node sees an exception or if the final state
      * of all caches is not consistent.
      *
      * @param sync use REPL_SYNC or REPL_ASYNC
      * @throws Exception
      */
     private void concurrentActivationTest(boolean sync) throws Exception
     {
        String[] names = {"A", "B", "C", "D", "E"};
        int count = names.length;
        CacheActivator[] activators = new CacheActivator[count];
  
  
        try
        {
           // Create a semaphore and take all its tickets
           Semaphore semaphore = new Semaphore(count);
           for (int i = 0; i < count; i++)
           {
              semaphore.acquire();
           }
  
           // Create activation threads that will block on the semaphore
           CacheSPI[] caches = new CacheSPI[count];
           for (int i = 0; i < count; i++)
           {
              activators[i] = new CacheActivator(semaphore, names[i], sync);
              caches[i] = activators[i].getCacheSPI();
              activators[i].start();
           }
  
           // Make sure everyone is in sync
           TestingUtil.blockUntilViewsReceived(caches, 60000);
  
           // Release the semaphore to allow the threads to start work 
           semaphore.release(count);
  
           // Sleep to ensure the threads get all the semaphore tickets
           TestingUtil.sleepThread((long) 1000);
  
           // Reacquire the semaphore tickets; when we have them all
           // we know the threads are done         
           for (int i = 0; i < count; i++)
           {
              boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
              if (!acquired)
              {
                 fail("failed to acquire semaphore " + i);
              }
           }
  
           // Sleep to allow any async calls to clear
           if (!sync)
           {
              TestingUtil.sleepThread(1000);
           }
  
           // Ensure the caches held by the activators see all the values
           for (int i = 0; i < count; i++)
           {
              Exception aException = activators[i].getException();
              boolean gotUnexpectedException = aException != null
                      && !(aException instanceof InactiveRegionException ||
                      aException.getCause() instanceof InactiveRegionException);
              if (gotUnexpectedException)
              {
                 fail("Activator " + names[i] + " caught an exception " + aException);
              }
  
              for (int j = 0; j < count; j++)
              {
                 Fqn fqn = new Fqn(A_B, names[j]);
                 assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
                         "VALUE", activators[i].getCacheValue(fqn));
                 //               System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
              }
  
           }
        }
        catch (Exception ex)
        {
           fail(ex.getLocalizedMessage());
        }
        finally
        {
           for (int i = 0; i < count; i++)
           {
              activators[i].cleanup();
           }
        }
  
     }
  
     /**
      * Starts two caches where each cache has N regions. We put some data in each of the regions.
      * We run two threads where each thread creates a cache then goes into a loop where it
      * activates the N regions, with a 1 sec pause between activations.
      * <p/>
      * Threads are started with 10 sec difference.
      * <p/>
      * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
      * then deploying webapps.
      * <p/>
      * <p/>
      * <p/>
      * Failure condition is if any node sees an exception or if the final state
      * of all caches is not consistent.
      *
      * @param sync use REPL_SYNC or REPL_ASYNC
      * @throws Exception
      */
     private void concurrentActivationTest2(boolean sync) throws Exception
     {
        String[] names = {"A", "B"};
        int count = names.length;
        int regionsToActivate = 15;
        int sleepTimeBetweenNodeStarts = 10000;
        StaggeredWebDeployerActivator[] activators = new StaggeredWebDeployerActivator[count];
        try
        {
           // Create a semaphore and take all its tickets
           Semaphore semaphore = new Semaphore(count);
           for (int i = 0; i < count; i++)
           {
              semaphore.acquire();
           }
  
           // Create activation threads that will block on the semaphore
           CacheSPI[] caches = new CacheSPI[count];
           for (int i = 0; i < count; i++)
           {
              activators[i] = new StaggeredWebDeployerActivator(semaphore, names[i], sync, regionsToActivate);
              caches[i] = activators[i].getCacheSPI();
  
              // Release the semaphore to allow the thread to start working 
              semaphore.release(1);
  
              activators[i].start();
              TestingUtil.sleepThread(sleepTimeBetweenNodeStarts);
           }
  
           // Make sure everyone is in sync
           TestingUtil.blockUntilViewsReceived(caches, 60000);
  
           // Sleep to ensure the threads get all the semaphore tickets
           TestingUtil.sleepThread(1000);
  
           // Reacquire the semaphore tickets; when we have them all
           // we know the threads are done         
           for (int i = 0; i < count; i++)
           {
              boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
              if (!acquired)
              {
                 fail("failed to acquire semaphore " + i);
              }
           }
  
           // Sleep to allow any async calls to clear
           if (!sync)
           {
              TestingUtil.sleepThread(1000);
           }
  
           // Ensure the caches held by the activators see all the values
           for (int i = 0; i < count; i++)
           {
              Exception aException = activators[i].getException();
              boolean gotUnexpectedException = aException != null
                      && !(aException instanceof InactiveRegionException ||
                      aException.getCause() instanceof InactiveRegionException);
              if (gotUnexpectedException)
              {
                 fail("Activator " + names[i] + " caught an exception " + aException);
              }
  
              for (int j = 0; j < regionsToActivate; j++)
              {
                 Fqn fqn = Fqn.fromString("/a/" + i + "/" + names[i]);
                 assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
                         "VALUE", activators[i].getCacheValue(fqn));
              }
           }
        }
        catch (Exception ex)
        {
           fail(ex.getLocalizedMessage());
        }
        finally
        {
           for (int i = 0; i < count; i++)
           {
              activators[i].cleanup();
           }
        }
  
     }
  
     /**
      * Starts two caches where each cache has N regions. We put some data in each of the regions.
      * We run two threads where each thread creates a cache then goes into a loop where it
      * activates the N regions, with a 1 sec pause between activations.
      * <p/>
      * Threads are started with 10 sec difference.
      * <p/>
      * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
      * then deploying webapps.
      * <p/>
      * <p/>
      * <p/>
      * Failure condition is if any node sees an exception or if the final state
      * of all caches is not consistent.
      */
     public void testConcurrentStartupActivationAsync() throws Exception
     {
        concurrentActivationTest2(false);
     }
  
     /**
      * Starts two caches where each cache has N regions. We put some data in each of the regions.
      * We run two threads where each thread creates a cache then goes into a loop where it
      * activates the N regions, with a 1 sec pause between activations.
      * <p/>
      * Threads are started with 10 sec difference.
      * <p/>
      * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
      * then deploying webapps.
      * <p/>
      * <p/>
      * <p/>
      * Failure condition is if any node sees an exception or if the final state
      * of all caches is not consistent.
      */
     public void testConcurrentStartupActivationSync() throws Exception
     {
        concurrentActivationTest2(true);
     }
  
     /**
      * Tests partial state transfer under heavy concurrent load and REPL_SYNC.
      * See <code>concurrentUseTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentUseSync() throws Exception
     {
        concurrentUseTest(true);
     }
  
     /**
      * Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
      * See <code>concurrentUseTest</code> for details.
      *
      * @throws Exception
      */
     public void testConcurrentUseAsync() throws Exception
     {
        concurrentUseTest(false);
     }
  
     /**
      * Initiates 5 caches, 4 with active trees and one with an inactive tree.
      * Each of the active caches begins rapidly generating puts against nodes
      * in a subtree for which it is responsible. The 5th cache activates
      * each subtree, and at the end confirms no node saw any exceptions and
      * that each node has consistent state.
      *
      * @param sync whether to use REPL_SYNC or REPL_ASYNCE
      * @throws Exception
      */
     private void concurrentUseTest(boolean sync) throws Exception
     {
        String[] names = {"B", "C", "D", "E"};
        int count = names.length;
        CacheStressor[] stressors = new CacheStressor[count];
  
        try
        {
  
           // The first cache we create is inactivated.
           CacheSPI cacheA = createCache("cacheA", sync, true, false);
  
           CacheSPI[] caches = new CacheSPI[count + 1];
           caches[0] = cacheA;
  
           // Create a semaphore and take all its tickets
           Semaphore semaphore = new Semaphore(count);
           for (int i = 0; i < count; i++)
           {
              semaphore.acquire();
           }
  
           // Create stressor threads that will block on the semaphore
  
           for (int i = 0; i < count; i++)
           {
              stressors[i] = new CacheStressor(semaphore, names[i], sync);
              caches[i + 1] = stressors[i].getCacheSPI();
              stressors[i].start();
           }
  
           // Make sure everyone's views are in sync
           TestingUtil.blockUntilViewsReceived(caches, 60000);
  
           // Repeat the basic test four times
           //for (int x = 0; x < 4; x++)
           for (int x = 0; x < 1; x++)
           {
              
  		    // Reset things by inactivating the region 
  		    // and enabling the stressors
  		    for (int i = 0; i < count; i++)
  		    {
  			   Region r = cacheA.getRegion(Fqn.fromString("/" + names[i]), true);
  			   r.registerContextClassLoader(getClass().getClassLoader());            	 
  			   r.deactivate();
  			   System.out.println("Run " + x + "-- /" + names[i] + " deactivated on A");
  		       stressors[i].startPuts();
  		    }
  		    
              // Release the semaphore to allow the threads to start work 
              semaphore.release(count);
  
              // Sleep to ensure the threads get all the semaphore tickets
              // and to ensure puts are actively in progress
              TestingUtil.sleepThread((long) 300);
  
              // Activate cacheA
              for (int i = 0; i < count; i++)
              {
                 //              System.out.println("Activating /" + names[i] + " on A");
                 cacheA.getRegion(Fqn.fromString("/" + names[i]), true).activate();
                 // Stop the stressor so we don't pollute cacheA's state
                 // with too many messages sent after activation -- we want
                 // to compare transferred state with the sender
                 stressors[i].stopPuts();
                 System.out.println("Run " + x + "-- /" + names[i] + " activated on A");
                 // Reacquire one semaphore ticket
                 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
                 if (!acquired)
                 {
                    fail("failed to acquire semaphore " + i);
                 }
  
                 // Pause to allow other work to proceed
                 TestingUtil.sleepThread(100);
              }
  
              // Sleep to allow any in transit msgs to clear
              //            if (!sync)
              TestingUtil.sleepThread(1000);
  
              // Ensure the stressors saw no exceptions
              for (int i = 0; i < count; i++)
              {
                 if (stressors[i].getException() != null && !(stressors[i].getException() instanceof InactiveRegionException))
                 {
                    fail("Stressor " + names[i] + " caught an exception " + stressors[i].getException());
                 }
  
              }
  
              TestingUtil.sleepThread(1000);
  
              // Compare cache contents
              for (int i = 0; i < count; i++)
              {
                 for (int j = 0; j < SUBTREE_SIZE; j++)
                 {
                    Fqn fqn = Fqn.fromString("/" + names[i] + "/" + j);
                    assertEquals("/A/" + j + " matches " + fqn,
                            cacheA.get(fqn, "KEY"),
                            stressors[i].getCacheSPI().get(fqn, "KEY"));
                 }
              }
           }
  
           for (int i = 0; i < count; i++)
           {
              stressors[i].stopThread();
           }
  
        }
        finally
        {
           for (int i = 0; i < count; i++)
           {
              if (stressors[i] != null)
              {
                 stressors[i].cleanup();
              }
           }
        }
  
     }
  
     /**
      * Test for JBCACHE-913
      *
      * @throws Exception
      */
     public void testEvictionSeesStateTransfer() throws Exception
     {
  
        Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
        Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
        cache1.start();
        caches.put("evict1", cache1);
  
        cache1.put(Fqn.fromString("/a/b/c"), "key", "value");
  
        c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
        Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
        cache2.start();
        caches.put("evict2", cache2);
  
        Region region = cache2.getRegion(Fqn.ROOT, false);
        // We expect events for /a, /a/b and /a/b/c
        assertEquals("Saw the expected number of node events", 3, region.nodeEventQueueSize());
     }
  
     /**
      * Further test for JBCACHE-913
      *
      * @throws Exception
      */
     public void testEvictionAfterStateTransfer() throws Exception
     {
        Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
        Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
        cache1.start();
        caches.put("evict1", cache1);
  
        for (int i = 0; i < 25000; i++)
        {
           cache1.put(Fqn.fromString("/base/" + i), "key", "base" + i);
           if (i < 5)
           {
              cache1.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i);
           }
        }
  
        c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
        final Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
        cache2.start();
        caches.put("evict2", cache2);
  
        Node parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
        Set children = parent.getChildren();
        assertEquals("All data children transferred", 5, children.size());
        parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
        children = parent.getChildren();
        assertTrue("Minimum number of base children transferred", children.size() >= 5000);
  
        // Sleep 2.5 secs so the nodes we are about to create in data won't
        // exceed the 4 sec TTL when eviction thread runs
        TestingUtil.sleepThread(2500);
  
        class Putter extends Thread
        {
           Cache cache = null;
           boolean stopped = false;
           Exception ex = null;
  
           public void run()
           {
              int i = 25000;
              while (!stopped)
              {
                 try
                 {
                    cache.put(Fqn.fromString("/base/" + i), "key", "base" + i);
                    cache.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i);
                    i++;
                 }
                 catch (Exception e)
                 {
                    ex = e;
                 }
              }
           }
        }
        Putter p1 = new Putter();
        p1.cache = cache1;
        p1.start();
        Putter p2 = new Putter();
        p2.cache = cache2;
        p2.start();
  
        Random rnd = new Random();
        TestingUtil.sleepThread(rnd.nextInt(200));
  
        int maxCountBase = 0;
        int maxCountData = 0;
        boolean sawBaseDecrease = false;
        boolean sawDataDecrease = false;
        long start = System.currentTimeMillis();
        while ((System.currentTimeMillis() - start) < 10000)
        {
           parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
           children = parent.getChildren();
           if (children != null)
           {
              int dataCount = children.size();
              if (dataCount < maxCountData)
              {
                 System.out.println("data " + dataCount + " < " + maxCountData + " elapsed = " + (System.currentTimeMillis() - start));
                 sawDataDecrease = true;
              }
              else
              {
                 maxCountData = dataCount;
              }
           }
  
           parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
           children = parent.getChildren();
           if (children != null)
           {
              int baseCount = children.size();
              if (baseCount < maxCountBase)
              {
                 System.out.println("base " + baseCount + " < " + maxCountBase + " elapsed = " + (System.currentTimeMillis() - start));
                 sawBaseDecrease = true;
              }
              else
              {
                 maxCountBase = baseCount;
              }
           }
  
           if (sawDataDecrease && sawBaseDecrease)
           {
              break;
           }
  
           TestingUtil.sleepThread(50);
        }
  
        p1.stopped = true;
        p2.stopped = true;
        p1.join(1000);
        p2.join(1000);
  
        assertTrue("Saw data decrease", sawDataDecrease);
        assertTrue("Saw base decrease", sawBaseDecrease);
        assertNull("No exceptions in p1", p1.ex);
        assertNull("No exceptions in p2", p2.ex);
  
        // Sleep 5.1 secs so we are sure the eviction thread ran
        TestingUtil.sleepThread(5100);
  
        parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
        children = parent.getChildren();
        if (children != null)
        {
           System.out.println(children.size());
           assertTrue("Excess children evicted", children.size() <= 5);
        }
        parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
        children = parent.getChildren();
        if (children != null)
        {
           System.out.println(children.size());
           assertTrue("Excess children evicted", children.size() <= 25000);
        }
  
        // Sleep more to let the eviction thread run again,
        // which will evict all data nodes due to their ttl of 4 secs
        TestingUtil.sleepThread(8100);
  
        parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
        children = parent.getChildren();
        if (children != null)
        {
           assertEquals("All data children evicted", 0, children.size());
        }
     }
  
  
     private class CacheActivator extends CacheUser
     {
  
        CacheActivator(Semaphore semaphore,
                       String name,
                       boolean sync)
                throws Exception
        {
           super(semaphore, name, sync, false);
        }
  
        void useCache() throws Exception
        {
           TestingUtil.sleepRandom(5000);
           createAndActivateRegion(cache, A_B);
           //         System.out.println(name + " activated region" + " " + System.currentTimeMillis());
           Fqn childFqn = Fqn.fromString("/a/b/" + name);
  
           cache.put(childFqn, "KEY", "VALUE");
           //         System.out.println(name + " put fqn " + childFqn + " " + System.currentTimeMillis());
  
        }
  
        public Object getCacheValue(Fqn fqn) throws CacheException
        {
           return cache.get(fqn, "KEY");
        }
     }
  
     private class StaggeredWebDeployerActivator extends CacheUser
     {
  
        int regionCount = 15;
  
        StaggeredWebDeployerActivator(Semaphore semaphore,
                                      String name,
                                      boolean sync,
                                      int regionCount)
                throws Exception
        {
           super(semaphore, name, sync, false);
           this.regionCount = regionCount;
        }
  
        void useCache() throws Exception
        {
           for (int i = 0; i < regionCount; i++)
           {
              createAndActivateRegion(cache, Fqn.fromString("/a/" + i));
  
              Fqn childFqn = Fqn.fromString("/a/" + i + "/" + name);
              cache.put(childFqn, "KEY", "VALUE");
  
              TestingUtil.sleepThread(1000);
           }
        }
  
        public Object getCacheValue(Fqn fqn) throws CacheException
        {
           return cache.get(fqn, "KEY");
        }
     }
  
     private class CacheStressor extends CacheUser
     {
        private Random random = new Random(System.currentTimeMillis());
        private boolean putsStopped = false;
        private boolean stopped = false;
  
        CacheStressor(Semaphore semaphore,
                      String name,
                      boolean sync)
                throws Exception
        {
           super(semaphore, name, sync, true);
        }
  
        void useCache() throws Exception
        {
           // Do continuous puts into the cache.  Use our own nodes,
           // as we're not testing conflicts between writer nodes,
           // just whether activation causes problems
           int factor = 0;
           int i = 0;
           Fqn fqn = null;
  
           boolean acquired = false;
           while (!stopped)
           {
              if (i > 0)
              {
                 acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
                 if (!acquired)
                 {
                    throw new Exception(name + " cannot acquire semaphore");
                 }
              }
  
              while (!putsStopped)
              {
                 factor = random.nextInt(50);
  
                 fqn = Fqn.fromString("/" + name + "/" + String.valueOf(factor % SUBTREE_SIZE));
                 Integer value = factor / SUBTREE_SIZE;
                 cache.put(fqn, "KEY", value);
  
                 TestingUtil.sleepThread((long) factor);
  
                 i++;
              }
  
              System.out.println(name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
  
              semaphore.release();
  
              // Go to sleep until directed otherwise
              while (!stopped && putsStopped)
              {
                 TestingUtil.sleepThread((long) 100);
              }
           }
        }
  
        public void stopPuts()
        {
           putsStopped = true;
        }
  
        public void startPuts()
        {
           putsStopped = false;
        }
  
        public void stopThread()
        {
           stopped = true;
           if (thread.isAlive())
           {
              thread.interrupt();
           }
        }
  
  
     }
  }
  
  



More information about the jboss-cvs-commits mailing list