Author: mircea.markus
Date: 2009-02-04 18:48:43 -0500 (Wed, 04 Feb 2009)
New Revision: 7648
Modified:
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
Log:
re-enabled and simplified tests
Modified:
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java 2009-02-04
23:39:11 UTC (rev 7647)
+++
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java 2009-02-04
23:48:43 UTC (rev 7648)
@@ -12,10 +12,11 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.UnitTestCacheFactory;
import org.jboss.cache.Fqn;
-import org.jboss.cache.Node;
import org.jboss.cache.Region;
import org.jboss.cache.RegionImpl;
+import org.jboss.cache.eviction.LRUAlgorithmConfig;
import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.EvictionRegionConfig;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.factories.UnitTestConfigurationFactory;
@@ -27,7 +28,6 @@
import org.testng.annotations.Test;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -38,7 +38,7 @@
*
* @author <a href="mailto://brian.stansberry@jboss.com">Brian
Stansberry</a>
*/
-@Test(groups = "functional", enabled = false, testName =
"statetransfer.StateTransferConcurrencyTest")
+@Test(groups = "functional", testName =
"statetransfer.StateTransferConcurrencyTest")
public class StateTransferConcurrencyTest extends StateTransferTestBase
{
protected String getReplicationVersion()
@@ -48,18 +48,6 @@
/**
* Tests concurrent activation of the same subtree by multiple nodes in a
- * REPL_SYNC environment. The idea is to see what would happen with a
- * farmed deployment. See <code>concurrentActivationTest</code> for
details.
- *
- * @throws Exception
- */
- public void testConcurrentActivationSync() throws Exception
- {
- concurrentActivationTest(true);
- }
-
- /**
- * Tests concurrent activation of the same subtree by multiple nodes in a
* REPL_ASYNC environment. The idea is to see what would happen with a
* farmed deployment. See <code>concurrentActivationTest</code> for
details.
*
@@ -71,25 +59,23 @@
}
/**
- * Starts 5 caches and then concurrently activates the same region under
- * all 5, causing each to attempt a partial state transfer from the others.
+ * //todo - create a mvn profile and allow tests to run on more than 2 caches
+ * Starts 2 caches and then concurrently activates the same region under
+ * all 2, causing each to attempt a partial state transfer from the other.
* As soon as each cache has activated its region, it does a put to a node
- * in the region, thus complicating the lives of the other caches trying
+ * in the region, thus complicating the lives of the other cache trying
* to get partial state.
* <p/>
* Failure condition is if any node sees an exception or if the final state
* of all caches is not consistent.
- *
- * @param sync use REPL_SYNC or REPL_ASYNC
- * @throws Exception
*/
private void concurrentActivationTest(boolean sync)
{
- String[] names = {"A", "B", "C", "D",
"E"};
+ String[] names = {"A", "B"};
int count = names.length;
CacheActivator[] activators = new CacheActivator[count];
-
+ long start = System.currentTimeMillis();
try
{
// Create a semaphore and take all its tickets
@@ -100,7 +86,7 @@
CacheSPI[] caches = new CacheSPI[count];
for (int i = 0; i < count; i++)
{
- activators[i] = new CacheActivator(semaphore, names[i], sync, caches);
+ activators[i] = new CacheActivator(semaphore, names[i], sync);
caches[i] = activators[i].getCacheSPI();
activators[i].start();
}
@@ -128,6 +114,8 @@
waitTillAllReplicationsFinish(count, caches);
}
+ System.out.println("System.currentTimeMillis()-st = " +
(System.currentTimeMillis()-start));
+
// Ensure the caches held by the activators see all the values
for (int i = 0; i < count; i++)
{
@@ -193,19 +181,19 @@
private void concurrentActivationTest2(boolean sync)
{
String[] names = {"A", "B"};
- int count = names.length;
- int regionsToActivate = 15;
- int sleepTimeBetweenNodeStarts = 10000;
- StaggeredWebDeployerActivator[] activators = new
StaggeredWebDeployerActivator[count];
+ int cacheCount = names.length;
+ int regionsToActivate = 3;
+ int sleepTimeBetweenNodeStarts = 1000;
+ StaggeredWebDeployerActivator[] activators = new
StaggeredWebDeployerActivator[cacheCount];
try
{
// Create a semaphore and take all its tickets
- Semaphore semaphore = new Semaphore(count);
- semaphore.acquire(count);
+ Semaphore semaphore = new Semaphore(cacheCount);
+ semaphore.acquire(cacheCount);
// Create activation threads that will block on the semaphore
- CacheSPI[] caches = new CacheSPI[count];
- for (int i = 0; i < count; i++)
+ CacheSPI[] caches = new CacheSPI[cacheCount];
+ for (int i = 0; i < cacheCount; i++)
{
activators[i] = new StaggeredWebDeployerActivator(semaphore, names[i], sync,
regionsToActivate);
caches[i] = activators[i].getCacheSPI();
@@ -225,7 +213,7 @@
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
- for (int i = 0; i < count; i++)
+ for (int i = 0; i < cacheCount; i++)
{
boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
if (!acquired)
@@ -237,11 +225,11 @@
// Sleep to allow any async calls to clear
if (!sync)
{
- waitTillAllReplicationsFinish(count, caches);
+ waitTillAllReplicationsFinish(cacheCount, caches);
}
// Ensure the caches held by the activators see all the values
- for (int i = 0; i < count; i++)
+ for (int i = 0; i < cacheCount; i++)
{
Exception aException = activators[i].getException();
boolean gotUnexpectedException = aException != null
@@ -266,7 +254,7 @@
}
finally
{
- for (int i = 0; i < count; i++)
+ for (int i = 0; i < cacheCount; i++)
{
activators[i].cleanup();
}
@@ -348,7 +336,7 @@
*/
private void concurrentUseTest(boolean sync) throws Exception
{
- String[] names = {"B", "C", "D", "E"};
+ String[] names = {"B"};
int count = names.length;
CacheStressor[] stressors = new CacheStressor[count];
@@ -467,29 +455,23 @@
*/
public void testEvictionSeesStateTransfer() throws Exception
{
- Configuration c =
UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
+ Configuration c =
UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, false);
additionalConfiguration(c);
Cache<Object, Object> cache1 = new UnitTestCacheFactory<Object,
Object>().createCache(c, getClass());
caches.put("evict1", cache1);
-
cache1.put(Fqn.fromString("/a/b/c"), "key",
"value");
c = UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
additionalConfiguration(c);
+ c.getEvictionConfig().setWakeupInterval(-1);
Cache<Object, Object> cache2 = new UnitTestCacheFactory<Object,
Object>().createCache(c, getClass());
caches.put("evict2", cache2);
RegionImpl region = (RegionImpl) cache2.getRegion(Fqn.ROOT, false);
// We expect a VISIT event for / and ADD events for /a, /a/b and /a/b/c
int nodeEventQueueSize = region.getEvictionEventQueue().size();
- int i = 0;
- int events = nodeEventQueueSize;
- while (events > 0)
- {
- events = region.getEvictionEventQueue().size();
- }
boolean mvcc = cache2.getConfiguration().getNodeLockingScheme() ==
NodeLockingScheme.MVCC;
- assertEquals("Saw the expected number of node events", mvcc ? 5 : 3,
nodeEventQueueSize);
+ assertEquals("Saw the expected number of node events", mvcc ? 6 : 4,
nodeEventQueueSize);
}
/**
@@ -497,183 +479,44 @@
*/
public void testEvictionAfterStateTransfer() throws Exception
{
- Configuration c =
UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
+ Configuration c =
UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, false);
additionalConfiguration(c);
Cache<Object, Object> cache1 = new UnitTestCacheFactory<Object,
Object>().createCache(c, getClass());
caches.put("evict1", cache1);
- for (int i = 0; i < 25000; i++)
+ for (int i = 0; i < 10; i++)
{
- cache1.put(Fqn.fromString("/org/jboss/data/" + i), "key",
"base" + i);
- if (i < 5)
- {
- cache1.put(Fqn.fromString("/org/jboss/test/data/" + i),
"key", "data" + i);
- if (i == 0)
- {
-
cache1.getRoot().getChild(Fqn.fromString("/org/jboss/data")).setResident(true);
//so that it won't be counted for eviction
- }
- }
+ cache1.put(Fqn.fromString("/org/jboss/test/data/" + i),
"key", "data" + i);
}
- EvictionController ec1 = new EvictionController(cache1);
- ec1.startEviction();
- int childrenSize =
cache1.getRoot().getChild(Fqn.fromString("/org/jboss/data")).getChildren().size();
- assert childrenSize == 5000 : "Expected 5000, saw " + childrenSize;
+ assert
cache1.getRoot().getChild(Fqn.fromString("/org/jboss/test/data/")).getChildren().size()
== 10;
c = UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
+ c.getEvictionConfig().setWakeupInterval(-1);
+ EvictionRegionConfig evictionRegionConfig =
c.getEvictionConfig().getEvictionRegionConfig("/org/jboss/test/data");
+ LRUAlgorithmConfig evictionAlgorithmConfig = (LRUAlgorithmConfig)
evictionRegionConfig.getEvictionAlgorithmConfig();
+ evictionAlgorithmConfig.setTimeToLive(-1);
additionalConfiguration(c);
final Cache<Object, Object> cache2 = new UnitTestCacheFactory<Object,
Object>().createCache(c, getClass());
+ EvictionController ec2 = new EvictionController(cache2);
caches.put("evict2", cache2);
- Node<Object, Object> parent;// =
cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
- parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/data"));
- Set children = parent.getChildren();
- //4999 because the root of the region will also be counted, as it is not resident
- assertTrue("Minimum number of base children transferred", children.size()
>= 4999);
-
- // Sleep 2.5 secs so the nodes we are about to create in data won't
- // exceed the 4 sec TTL when eviction thread runs
- TestingUtil.sleepThread(2500);
-
- class Putter extends Thread
- {
- Cache<Object, Object> cache = null;
- boolean stopped = false;
- Exception ex = null;
-
- public void run()
- {
- int i = 25000;
- while (!stopped)
- {
- try
- {
- cache.put(Fqn.fromString("/org/jboss/data/" + i),
"key", "base" + i);
- cache.put(Fqn.fromString("/org/jboss/test/data/" + i),
"key", "data" + i);
- i++;
- }
- catch (Exception e)
- {
- ex = e;
- }
- }
- }
- }
- Putter p1 = new Putter();
- p1.cache = cache1;
- p1.start();
- Putter p2 = new Putter();
- p2.cache = cache2;
- p2.start();
-
- Random rnd = new Random();
- TestingUtil.sleepThread(rnd.nextInt(200));
-
- int maxCountBase = 0;
- int maxCountData = 0;
- boolean sawBaseDecrease = false;
- boolean sawDataDecrease = false;
- long start = System.currentTimeMillis();
- Node root = cache2.getRoot();
- while ((System.currentTimeMillis() - start) < 10000)
- {
- parent = root.getChild(Fqn.fromString("/org/jboss/test/data"));
- children = parent.getChildren();
- if (children != null)
- {
- int dataCount = children.size();
- if (dataCount < maxCountData)
- {
- sawDataDecrease = true;
- }
- else
- {
- maxCountData = dataCount;
- }
- }
-
- parent =
cache2.getRoot().getChild(Fqn.fromString("/org/jboss/data"));
- children = parent.getChildren();
- if (children != null)
- {
- int baseCount = children.size();
- if (baseCount < maxCountBase)
- {
- sawBaseDecrease = true;
- }
- else
- {
- maxCountBase = baseCount;
- }
- }
-
- if (sawDataDecrease && sawBaseDecrease)
- {
- break;
- }
-
- TestingUtil.sleepThread(50);
- }
-
- p1.stopped = true;
- p2.stopped = true;
- p1.join(1000);
- p2.join(1000);
-
- assertTrue("Saw data decrease", sawDataDecrease);
- assertTrue("Saw base decrease", sawBaseDecrease);
- assertNull("No exceptions in p1", p1.ex);
- assertNull("No exceptions in p2", p2.ex);
-
- EvictionController ec2 = new EvictionController(cache2);
+ assert
cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data/")).getChildren().size()
== 10;
ec2.startEviction();
-
- parent =
cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
- children = parent.getChildren();
- if (children != null)
- {
- assertTrue("Excess children evicted", children.size() <= 5);
- }
- parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/data"));
- children = parent.getChildren();
- if (children != null)
- {
- assertTrue("Excess children evicted", children.size() <= 25000);
- }
-
- // Sleep more to let the eviction thread run again,
- // which will evict all data nodes due to their ttl of 4 secs
- ec2.evictRegionWithTimeToLive("/org/jboss/test/data");
-
- parent =
cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
- if (parent != null)
- {
- children = parent.getChildren();
- if (children != null)
- {
- assertEquals("All data children evicted", 0, children.size());
- }
- }
+ assert
cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data/")).getChildren().size()
== 5;
}
private class CacheActivator extends CacheUser
{
-
- private CacheSPI[] caches;
-
- CacheActivator(Semaphore semaphore,
- String name,
- boolean sync, CacheSPI[] caches)
- throws Exception
+ CacheActivator(Semaphore semaphore, String name, boolean sync) throws Exception
{
super(semaphore, name, sync, false, 120000);
- this.caches = caches;
}
@SuppressWarnings("unchecked")
void useCache() throws Exception
{
- TestingUtil.sleepRandom(5000);
+ TestingUtil.sleepRandom(500);
createAndActivateRegion(cache, A_B);
Fqn childFqn = Fqn.fromRelativeElements(A_B, name);
cache.put(childFqn, "KEY", "VALUE");
@@ -690,11 +533,7 @@
int regionCount = 15;
- StaggeredWebDeployerActivator(Semaphore semaphore,
- String name,
- boolean sync,
- int regionCount)
- throws Exception
+ StaggeredWebDeployerActivator(Semaphore semaphore, String name, boolean sync, int
regionCount) throws Exception
{
super(semaphore, name, sync, false);
this.regionCount = regionCount;
@@ -705,11 +544,8 @@
for (int i = 0; i < regionCount; i++)
{
createAndActivateRegion(cache, Fqn.fromString("/a/" + i));
-
Fqn childFqn = Fqn.fromString("/a/" + i + "/" + name);
cache.put(childFqn, "KEY", "VALUE");
-
- TestingUtil.sleepThread(1000);
}
}