[jboss-svn-commits] JBL Code SVN: r31422 - in labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance: jdbc and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Feb 5 03:43:17 EST 2010
Author: whitingjr
Date: 2010-02-05 03:43:17 -0500 (Fri, 05 Feb 2010)
New Revision: 31422
Modified:
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java
labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java
Log:
Re-engineered the testing framework. Refactored the recursive behaviour out of concrete test cases.
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,45 +22,15 @@
package org.jboss.jbossts.performance;
-import javax.persistence.EntityManager;
-import javax.transaction.Synchronization;
-
-import org.apache.log4j.Logger;
-import org.hibernate.Transaction;
-
-
-
+/**
+ * This object changes the default behaviour. Changing this here ensures the test framework
+ * sets up a 2nd resource with default table data and settings.
+ * @author whitingjr
+ *
+ */
public abstract class MultiResourceTest extends WarmedUpTest
{
- /*
- public class SessionSynchronization implements Synchronization
- {
- private final Logger logger = Logger.getLogger(SessionSynchronization.class);
- private EntityManager manager;
- private boolean closeAtTxCompletion;
-
- public SessionSynchronization(EntityManager session, Transaction tx, boolean close)
- {
- this.manager = session;
- closeAtTxCompletion = close;
- }
-
- public void beforeCompletion()
- {
- }
-
- public void afterCompletion(int status)
- {
- if (closeAtTxCompletion)
- {
- logger.debug("************** closing entity managersession **************");
- manager.close();
- }
- }
- }
- */
-
- @Override
+ @Override
protected boolean isMultiResourceTest()
{
return false;
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -4,12 +4,18 @@
import java.util.concurrent.Executor;
import org.apache.log4j.Logger;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
+import org.jboss.jbossts.performance.configuration.TestConfiguration;
public abstract class MultiThreadedTest extends MultiResourceTest
{
private static final Logger logger = Logger.getLogger(MultiThreadedTest.class);
+ /**
+ * Use this method to start concurrent executions of the test. This should be used for
+ * initial warm up run of the compiler and stable runs.
+ */
@Override
- public void startConcurrentExecutions(int recurseCount, int threadCount, final int profiledCount)
+ public void startConcurrentExecutions(final TestConfiguration configuration)
throws Exception
{
/**
@@ -18,29 +24,30 @@
* until all workers have finished. The barrier is triggered and all fall out
* of method frame.
* This thread is included in the barrier to ensure this thread
- * remains active during all demon threads during processing.
+ * remains active during all spawned threads are processing.
*/
- CyclicBarrier completionBarrier = new CyclicBarrier(1 + threadCount);
+ CyclicBarrier completionBarrier = new CyclicBarrier(1 + configuration.getThreadCount()); // this shared between threads
- for (int count = 1; count <= threadCount; count += 1)
+ for (int count = 1; count <= configuration.getThreadCount(); count += 1)
{
-
Executor executor = new ThreadPerTestWorkerExecutor();
- executor.execute(getTask( recurseCount, completionBarrier, count));
+ /* Here we use the count value as the threadIdentity. This allows each thread to
+ * use (read/write) a record in the database that does not overlap with other threads.
+ * Avoiding deadlocking in the database. */
+ executor.execute(getTask( new MultithreadedTestConfiguration(configuration.getIterationCount(), completionBarrier, new Long(count)) ));
}
completionBarrier.await();// start all the threads processing
-
+
logger.info("All threads have started warmup.");
if (!completionBarrier.isBroken())
- {
- completionBarrier.await();// all threads fall out of method frame
+ {// good, worked so far, now wait for completion of tests
+ completionBarrier.await();// all threads fall out, either by assertion error or all iterations completing
// wait for all to finish profiled run
- logger.info("All threads have completed warmup run.");
+ logger.info("All threads have completed run.");
}
else
{
- logger.warn("The controlling test class did not await for child threads. The barrier was broken.");
+ logger.warn("The controlling test class did not await for child threads. The co-ordinating barrier was broken.");
}
-
}
}
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -28,6 +28,8 @@
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
+import org.jboss.jbossts.performance.configuration.TestConfiguration;
import org.testng.annotations.Parameters;
import org.testng.annotations.Test;
@@ -52,7 +54,7 @@
throws Exception
{
NDC.push("warmup");
- startConcurrentExecutions(Integer.parseInt(warmupCountConfig), Integer.parseInt(threadCountConfig), Integer.parseInt(profiledCountConfig));
+ startConcurrentExecutions(new TestConfiguration(Integer.parseInt(warmupCountConfig), Integer.parseInt(threadCountConfig), Long.parseLong(profiledCountConfig)));
NDC.remove();
}
@@ -71,7 +73,7 @@
{
logger.warn(e.getMessage());
}
- startConcurrentExecutions(Integer.parseInt(testCountConfig), Integer.parseInt(concurrentCountConfig), Integer.parseInt(profiledCountConfig));
+ startConcurrentExecutions(new TestConfiguration(Integer.parseInt(testCountConfig), Integer.parseInt(concurrentCountConfig), Long.parseLong(profiledCountConfig)));
try
{
getProfiler().stop();
@@ -89,9 +91,9 @@
* @param threadCount
* @throws Exception
*/
- public abstract void startConcurrentExecutions(final int recurseCount,final int threadCount,final int profiledCount) throws Exception;
+ public abstract void startConcurrentExecutions(final TestConfiguration configuration) throws Exception;
- public abstract Runnable getTask(int recurseCount, CyclicBarrier completionBarrier, int threadId) throws Exception;
+ public abstract Runnable getTask(final MultithreadedTestConfiguration taskConfiguration) throws Exception;
public final class ThreadPerTestWorkerExecutor implements Executor
{
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,115 +22,28 @@
package org.jboss.jbossts.performance.jdbc;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Query;
import javax.transaction.Synchronization;
import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
import org.hibernate.Transaction;
import org.hibernate.ejb.EntityManagerImpl;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.profiler.Profiler;
+import org.jboss.jbossts.performance.task.RecursiveTask;
import org.jboss.tm.TransactionLocal;
import org.testng.Assert;
-public class JDBCTask implements Runnable
+public class JDBCTask extends RecursiveTask
{
- private final EntityManagerFactory emfA;
- private final EntityManagerFactory emfB;
- private final boolean isOptionalWriteEnabled;
- private final int recurseCount;
- private final Logger logger = Logger.getLogger(JDBCTask.class);
- private final CyclicBarrier completionBarrier;
- private final Long threadId;
- private final CountDownLatch latch;
- private final Stack parentNDC;
- public JDBCTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, Stack stack)
+
+ public JDBCTask(final ConcurrentTaskConfiguration taskConfiguration)
{
- this.emfA = factoryA;
- this.emfB = factoryB;
- this.recurseCount = count;
- this.completionBarrier = barrier;
- this.threadId = new Long(id);
- this.isOptionalWriteEnabled = isWEnabled;
- this.latch = null;
- this.parentNDC = stack;
+ super(taskConfiguration);
}
-
- public JDBCTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, final CountDownLatch countDown, Stack stack)
- {
- this.emfA = factoryA;
- this.emfB = factoryB;
- this.recurseCount = count;
- this.completionBarrier = barrier;
- this.threadId = new Long(id);
- this.isOptionalWriteEnabled = isWEnabled;
- this.latch = countDown;
- this.parentNDC = stack;
- }
-
-
- @Override
- public void run()
- {
- try
- {/* use the barrier to wait for all concurrent threads, when finished decrement the latch and fall out of
- method. Means the profiler does not
- */
- NDC.inherit(this.parentNDC);
- NDC.push(this.threadId.toString());
- this.completionBarrier.await();// await all threads ready to start
- recurse(this.recurseCount);
- logger.info("Finished recursing. Awaiting on synch point.");
- if (!this.completionBarrier.isBroken())
- {// fall out of thread without waiting.
- this.completionBarrier.await();// await all threads have finished
- }
- }
- catch (Exception e)
- {
- logger.error(e.getMessage(), e);
- this.completionBarrier.reset();
- Assert.fail(e.getMessage());
- }
- catch (AssertionError ae)
- {
- this.completionBarrier.reset();
- throw ae;
- }
- finally
- {
- NDC.clear();
- }
- }
-
- /**
- * Run the test sequentially.
- *
- * @param count
- */
- private void recurse(final int count)
- {
- for (int i = 1; i <= count; i += 1)
- {
- try
- {
- task(i);
- logger.debug("Completed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
-
- } catch (Exception e)
- {
- this.completionBarrier.reset();
- logger.error("Failed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
- Assert.fail(e.getMessage(), e);
- }
- }
- }
public void task(final int iteration)
throws Exception
{
@@ -146,11 +59,9 @@
transaction.registerSynchronization(new SessionSynchronization(emA, transaction, true));
transaction.registerSynchronization(new SessionSynchronization( emB, transaction, true ) );
- //Object obj = emA.createNamedQuery("findUserById").setParameter("identifier", this.threadId).getSingleResult();
session.set(emA);
Object[] usrA = (Object[])findUsingResourceA(emA, readSQL);//discard details
Number version = (Number ) usrA[12];
- //logger.debug(usrA.toString());
long time = System.currentTimeMillis();
String writeSQLResA = String.format("update USERS set DEFAULT_BILLING_DETAILS_ID=%1$d, EMAIL='%2$s', FIRSTNAME='%3$s', HOME_CITY='%4$s', HOME_STREET='%5$s', HOME_ZIPCODE='%6$s', LASTNAME='%7$s', PASSWD='%8$s', RANK=%9$d, USERNAME='%10$s', OBJ_VERSION=%11$d where USER_ID=%12$d and OBJ_VERSION=%13$d",
@@ -169,10 +80,13 @@
null, "anotheruser at mail.tld", "Ben"+time, "Foocity", "Foostreet", "22222", "User", "secret", 0, "anotheruser110"+this.threadId, version.intValue()+1, this.threadId, version.intValue());
writeUserB(emB, writeSQLResB);
- //resourceAFlush(emA); flush not necessary
- //resourceBFlush(emB); flush not necessary
emA.getSession().getTransaction().commit();
Assert.assertFalse(emA.isOpen());
+ boolean trip = false;
+ if (trip)
+ {
+ Profiler.
+ }
}
private void writeUserA(final EntityManager em, final String sql)
{
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -26,6 +26,8 @@
import org.apache.log4j.NDC;
import org.jboss.jbossts.performance.MultiThreadedTest;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
/**
* This class creates an instance of the underlying task.
@@ -36,9 +38,9 @@
public class JDBCTest extends MultiThreadedTest
{
@Override
- public Runnable getTask(int recurseCount, CyclicBarrier completionBarrier, int threadId) throws Exception
+ public Runnable getTask(final MultithreadedTestConfiguration taskConfiguration) throws Exception
{
- return new JDBCTask(getEntityManagerFactory(), getEntityManagerFactoryB(), recurseCount, completionBarrier, threadId, this.isOptionalWriteEnabled, NDC.cloneStack());
+ return new JDBCTask(new ConcurrentTaskConfiguration(getEntityManagerFactory(), getEntityManagerFactoryB(), taskConfiguration.getIterationCount(), taskConfiguration.getCompletionBarrier(), this.isOptionalWriteEnabled, NDC.cloneStack(), taskConfiguration.getThreadIdentity()));
}
@Override
protected boolean isMultiResourceTest()
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,18 +22,23 @@
package org.jboss.jbossts.performance.resource;
-import java.util.concurrent.CyclicBarrier;
-
import org.apache.log4j.NDC;
import org.jboss.jbossts.performance.MultiThreadedTest;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
+/**
+ * This is in all intents and purposes has ended up being a factory.
+ * @author whitingjr
+ *
+ */
public class SynchronizeResourcesTest extends MultiThreadedTest
{
@Override
- public Runnable getTask(final int recurseCount, final CyclicBarrier completionBarrier, final int id)
+ public Runnable getTask(final MultithreadedTestConfiguration taskConfiguration)
throws Exception
{
- return new SynchronizeTask(getEntityManagerFactory(), getEntityManagerFactoryB(), recurseCount, completionBarrier, id, this.isOptionalWriteEnabled, NDC.cloneStack());
+ return new SynchronizeTask(new ConcurrentTaskConfiguration( getEntityManagerFactory(), getEntityManagerFactoryB(), taskConfiguration.getIterationCount(), taskConfiguration.getCompletionBarrier(), this.isOptionalWriteEnabled, NDC.cloneStack(), taskConfiguration.getThreadIdentity()));
}
@Override
protected boolean isMultiResourceTest()
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -1,18 +1,15 @@
package org.jboss.jbossts.performance.resource;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Query;
import javax.transaction.Synchronization;
import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
import org.hibernate.Transaction;
import org.hibernate.ejb.EntityManagerImpl;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.task.RecursiveTask;
import org.jboss.tm.TransactionLocal;
import org.testng.Assert;
@@ -21,82 +18,30 @@
/**
- * This is the task that will.
- * A SynchronizeTask.
+ * This is the task that will read and write changes to two separate databases.
+ * The implementation uses an ORM to handle the find and write operations. JTA
+ * is used to co-ordinate a distributed transaction.
+ * The implementation is a little clunky but it does what's necessary. The requirement
+ * was to avoid depending on using an EJB container and use the Hibernate APIs and internals
+ * to switch between native and xa functionality without code changes.
*
* @author <a href="jwhiting at redhat.com">Jeremy Whiting</a>
* @version $Revision: 1.1 $
*/
-public class SynchronizeTask implements Runnable
+public class SynchronizeTask extends RecursiveTask
{
- private final EntityManagerFactory emfA;
- private final EntityManagerFactory emfB;
- private final boolean isOptionalWriteEnabled;
- private final int recurseCount;
- private final static Logger logger = Logger.getLogger(SynchronizeTask.class);
- private final CyclicBarrier completionBarrier;
- private final Long threadId;
- private final CountDownLatch latch;
- private final Stack parentNDC;
- public SynchronizeTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, Stack stack)
- {
- this.emfA = factoryA;
- this.emfB = factoryB;
- this.recurseCount = count;
- this.completionBarrier = barrier;
- this.threadId = new Long(id);
- this.isOptionalWriteEnabled = isWEnabled;
- this.latch = null;
- this.parentNDC = stack;
- }
- public SynchronizeTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, final CountDownLatch countDown, Stack stack)
+ public SynchronizeTask(final ConcurrentTaskConfiguration taskConfiguration)
{
- this.emfA = factoryA;
- this.emfB = factoryB;
- this.recurseCount = count;
- this.completionBarrier = barrier;
- this.threadId = new Long(id);
- this.isOptionalWriteEnabled = isWEnabled;
- this.latch = countDown;
- this.parentNDC = stack;
+ super(taskConfiguration);
}
- @Override
- public void run()
- {
- try
- {/* use the barrier to wait for all concurrent threads, when finished decrement the latch and fall out of
- method.
- */
- NDC.inherit(this.parentNDC);
- NDC.push(this.threadId.toString());
- this.completionBarrier.await();// await all threads ready to start
- recurse(this.recurseCount);
- logger.info("Finished recursing. Awaiting on synch point.");
- if (!this.completionBarrier.isBroken())
- {// fall out of thread without waiting.
- this.completionBarrier.await();// await all threads have finished
- }
- }
- catch (Exception e)
- {
- logger.error(e.getMessage(), e);
- this.completionBarrier.reset();
- Assert.fail(e.getMessage());
- }
- catch (AssertionError ae)
- {
- this.completionBarrier.reset();
- throw ae;
- }
- finally
- {
- NDC.clear();
- }
- }
-
- public void task()
+ /**
+ * This is the task which performs a read and (optional) write to the resource.
+ * Uses ORM to handle entity bean modification and persistence of changes.
+ */
+ @Override
+ public void task(final int iteration)
throws Exception
{
EntityManagerImpl emA = null;
@@ -122,7 +67,6 @@
userDAO_B.setEntityManagerB(emB);
// Prepare a user object
- //User user = userDAO_A.findById(this.threadId, false);
session.set(emA);
User userA = findUsingResourceA(userDAO_A);
@@ -142,31 +86,8 @@
resourceAFlush(emA);
resourceBFlush(emB);
- //emA.getSession().setReadOnly(user, true);
emA.getSession().getTransaction().commit();
}
- /**
- * Run the test sequentially.
- *
- * @param count
- */
- private void recurse(final int count)
- {
- for (int i = 0; i < count; i += 1)
- {
- try
- {
- task();
- logger.debug("Completed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
-
- } catch (Exception e)
- {
- this.completionBarrier.reset();
- logger.error("Failed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
- Assert.fail(e.getMessage(), e);
- }
- }
- }
public EntityManagerFactory getEMFactoryA()
{
@@ -178,6 +99,11 @@
return this.emfB;
}
+ /**
+ * This class provides a synchronziation of the second resource when the first commits.
+ * @author whitingjr
+ *
+ */
public class SessionSynchronization implements Synchronization
{
private final Logger logger = Logger.getLogger(SessionSynchronization.class);
@@ -192,6 +118,7 @@
public void beforeCompletion()
{
+// this.manager.flush();
}
public void afterCompletion(int status)
Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java 2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java 2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,11 +22,105 @@
package org.jboss.jbossts.performance.task;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+
+import javax.persistence.EntityManagerFactory;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.resource.SynchronizeTask;
+import org.testng.Assert;
+
/**
* The purpose of this object is to provide recursive behaviour for a test case task.
- * Repeatedly call the
+ * Repeatedly calling the body of a test case. The test case method is task rather than
+ * using the @Test annotation.
+ * This class also provides disabling of profiling for a thread. This means profiling
+ *
*/
-public class RecursiveTask
+public abstract class RecursiveTask implements Runnable
{
+ protected final EntityManagerFactory emfA;
+ protected final EntityManagerFactory emfB;
+ protected final boolean isOptionalWriteEnabled;
+ private final int recurseCount;
+ private final static Logger logger = Logger.getLogger(SynchronizeTask.class);
+ private final CyclicBarrier completionBarrier;
+ //private final CountDownLatch latch;
+ protected final Long threadId;
+ private final Stack parentNDC;
+ private final ConcurrentTaskConfiguration taskConfiguration;
+ public RecursiveTask(final ConcurrentTaskConfiguration configuration)
+ {
+ this.emfA = configuration.getEmfactoryA();
+ this.emfB = configuration.getEmfactoryB();
+ this.recurseCount = configuration.getIterationCount();
+ this.completionBarrier = configuration.getCompletionBarrier();
+ this.isOptionalWriteEnabled = configuration.isOptionalWriteEnabled();
+ //this.latch = null;
+ this.parentNDC = configuration.getNDCstack();
+ this.threadId = configuration.getThreadIdentity();
+ this.taskConfiguration = configuration;
+ }
+ @Override
+ public void run()
+ {
+ try
+ {/* use the barrier to wait for all concurrent threads, when finished decrement the latch and fall out of
+ method.
+ */
+ NDC.inherit(this.parentNDC);
+ NDC.push(this.threadId.toString());
+ this.completionBarrier.await();// await all threads ready to start
+ recurse(this.recurseCount);
+ logger.info("Finished recursing. Awaiting on synch point.");
+ if (!this.completionBarrier.isBroken())
+ {// fall out of thread without waiting.
+ this.completionBarrier.await();// await all threads have finished
+ }
+ }
+ catch (Exception e)
+ {
+ logger.error(e.getMessage(), e);
+ this.completionBarrier.reset();
+ Assert.fail(e.getMessage());
+ }
+ catch (AssertionError ae)
+ {
+ this.completionBarrier.reset();
+ throw ae;
+ }
+ finally
+ {
+ NDC.clear();
+ }
+ }
+ protected abstract void task(final int iteration) throws Exception;
+
+ /**
+ * Run the test sequentially.
+ *
+ * @param count
+ */
+ private void recurse(final int count)
+ {
+ for (int i = 0; i < count; i += 1)
+ {
+ try
+ {
+ task(i);
+ logger.debug(String.format("Completed profiled run of testbody for thread [%1$d] count [%2$d].", this.threadId, i));
+
+ } catch (Exception e)
+ {
+ this.completionBarrier.reset();
+ logger.error(String.format("Failed profiled run of testbody for thread [%1$d] count [$2$d].", this.threadId, i ));
+ Assert.fail(e.getMessage(), e);
+ }
+ }
+ }
}
More information about the jboss-svn-commits
mailing list