[jboss-svn-commits] JBL Code SVN: r31422 - in labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance: jdbc and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Feb 5 03:43:17 EST 2010


Author: whitingjr
Date: 2010-02-05 03:43:17 -0500 (Fri, 05 Feb 2010)
New Revision: 31422

Modified:
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java
   labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java
Log:
Re-engineered the testing framework. Refactored the recursive behaviour out of concrete test cases.

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiResourceTest.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,45 +22,15 @@
 
 package org.jboss.jbossts.performance;
 
-import javax.persistence.EntityManager;
-import javax.transaction.Synchronization;
-
-import org.apache.log4j.Logger;
-import org.hibernate.Transaction;
-
-
-
+/**
+ * This object changes the default behaviour. Changing this here ensures the test framework
+ * sets up a 2nd resource with default table data and settings.
+ * @author whitingjr
+ *
+ */
 public abstract class MultiResourceTest extends WarmedUpTest
 {
-   /*
-   public class SessionSynchronization implements Synchronization
-   {
-      private final Logger logger = Logger.getLogger(SessionSynchronization.class);
-      private EntityManager manager;
-      private boolean closeAtTxCompletion;
-
-      public SessionSynchronization(EntityManager session, Transaction tx, boolean close)
-      {
-         this.manager = session;
-         closeAtTxCompletion = close;
-      }
-
-      public void beforeCompletion()
-      {
-      }
-
-      public void afterCompletion(int status)
-      {
-         if (closeAtTxCompletion)
-         {
-            logger.debug("************** closing entity managersession **************");
-            manager.close();
-         }
-      }
-   }
-   */
-
-   @Override
+      @Override
    protected boolean isMultiResourceTest()
    {
       return false;

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/MultiThreadedTest.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -4,12 +4,18 @@
 import java.util.concurrent.Executor;
 
 import org.apache.log4j.Logger;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
+import org.jboss.jbossts.performance.configuration.TestConfiguration;
 
 public abstract class MultiThreadedTest extends MultiResourceTest
 {
    private static final Logger logger = Logger.getLogger(MultiThreadedTest.class);
+   /**
+    * Use this method to start concurrent executions of the test. This should be used for
+    * initial warm up run of the compiler and stable runs.
+    */
    @Override
-   public void startConcurrentExecutions(int recurseCount, int threadCount, final int profiledCount)
+   public void startConcurrentExecutions(final TestConfiguration configuration)
          throws Exception
    {
       /**
@@ -18,29 +24,30 @@
        * until all workers have finished. The barrier is triggered and all fall out
        * of method frame.
        * This thread is included in the barrier to ensure this thread 
-       * remains active during all demon threads during processing.
+       * remains active during all spawned threads are processing.
        */
-      CyclicBarrier completionBarrier = new CyclicBarrier(1 + threadCount);
+      CyclicBarrier completionBarrier = new CyclicBarrier(1 + configuration.getThreadCount()); // this shared between threads
       
-      for (int count = 1; count <= threadCount; count += 1)
+      for (int count = 1; count <= configuration.getThreadCount(); count += 1)
       {
-         
          Executor executor = new ThreadPerTestWorkerExecutor();
-         executor.execute(getTask( recurseCount,  completionBarrier, count));
+         /* Here we use the count value as the threadIdentity. This allows each thread to 
+          * use (read/write) a record in the database that does not overlap with other threads. 
+          * Avoiding deadlocking in the database. */
+         executor.execute(getTask( new MultithreadedTestConfiguration(configuration.getIterationCount(), completionBarrier, new Long(count)) ));
       }
       completionBarrier.await();// start all the threads processing
-      
+       
       logger.info("All threads have started warmup.");
       if (!completionBarrier.isBroken())
-      {
-         completionBarrier.await();// all threads fall out of method frame
+      {// good, worked so far, now wait for completion of tests
+         completionBarrier.await();// all threads fall out, either by assertion error or all iterations completing
          // wait for all to finish profiled run
-         logger.info("All threads have completed warmup run.");
+         logger.info("All threads have completed run.");
       }
       else
       {
-         logger.warn("The controlling test class did not await for child threads. The barrier was broken.");
+         logger.warn("The controlling test class did not await for child threads. The co-ordinating barrier was broken.");
       }
-      
    }
 }

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/WarmedUpTest.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -28,6 +28,8 @@
 
 import org.apache.log4j.Logger;
 import org.apache.log4j.NDC;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
+import org.jboss.jbossts.performance.configuration.TestConfiguration;
 import org.testng.annotations.Parameters;
 import org.testng.annotations.Test;
 
@@ -52,7 +54,7 @@
       throws Exception
    {
       NDC.push("warmup");
-      startConcurrentExecutions(Integer.parseInt(warmupCountConfig), Integer.parseInt(threadCountConfig), Integer.parseInt(profiledCountConfig));
+      startConcurrentExecutions(new TestConfiguration(Integer.parseInt(warmupCountConfig), Integer.parseInt(threadCountConfig), Long.parseLong(profiledCountConfig)));
       NDC.remove();
    }
 
@@ -71,7 +73,7 @@
       {
          logger.warn(e.getMessage());
       }
-      startConcurrentExecutions(Integer.parseInt(testCountConfig), Integer.parseInt(concurrentCountConfig), Integer.parseInt(profiledCountConfig));
+      startConcurrentExecutions(new TestConfiguration(Integer.parseInt(testCountConfig), Integer.parseInt(concurrentCountConfig), Long.parseLong(profiledCountConfig)));
       try
       {
          getProfiler().stop();
@@ -89,9 +91,9 @@
     * @param threadCount 
     * @throws Exception 
     */
-   public abstract void startConcurrentExecutions(final int recurseCount,final  int threadCount,final int profiledCount)  throws Exception;
+   public abstract void startConcurrentExecutions(final TestConfiguration configuration)  throws Exception;
    
-   public abstract Runnable getTask(int recurseCount, CyclicBarrier completionBarrier, int threadId) throws Exception;
+   public abstract Runnable getTask(final MultithreadedTestConfiguration taskConfiguration) throws Exception;
    
    public final class ThreadPerTestWorkerExecutor implements Executor
    {

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTask.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,115 +22,28 @@
 
 package org.jboss.jbossts.performance.jdbc;
 
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Query;
 import javax.transaction.Synchronization;
 
 import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
 import org.hibernate.Transaction;
 import org.hibernate.ejb.EntityManagerImpl;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.profiler.Profiler;
+import org.jboss.jbossts.performance.task.RecursiveTask;
 import org.jboss.tm.TransactionLocal;
 import org.testng.Assert;
 
-public class JDBCTask implements Runnable
+public class JDBCTask extends RecursiveTask
 {
-   private final EntityManagerFactory emfA;
-   private final EntityManagerFactory emfB;
-   private final boolean isOptionalWriteEnabled;
-   private final int recurseCount;
-   private final Logger logger = Logger.getLogger(JDBCTask.class);
-   private final CyclicBarrier completionBarrier;
-   private final Long threadId;
-   private final CountDownLatch latch;
-   private final Stack parentNDC;
-   public JDBCTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, Stack stack)
+    
+   public JDBCTask(final ConcurrentTaskConfiguration taskConfiguration)
    {
-      this.emfA = factoryA;
-      this.emfB = factoryB;
-      this.recurseCount = count;
-      this.completionBarrier = barrier;
-      this.threadId = new Long(id);
-      this.isOptionalWriteEnabled = isWEnabled;
-      this.latch = null;
-      this.parentNDC = stack;
+      super(taskConfiguration);
    }
-   
-   public JDBCTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, final CountDownLatch countDown, Stack stack)
-   {
-      this.emfA = factoryA;
-      this.emfB = factoryB;
-      this.recurseCount = count;
-      this.completionBarrier = barrier;
-      this.threadId = new Long(id);
-      this.isOptionalWriteEnabled = isWEnabled;
-      this.latch = countDown;
-      this.parentNDC = stack;
-   }
-   
-   
-   @Override
-   public void run()
-   {
-      try
-      {/* use the barrier to wait for all concurrent threads, when finished decrement the latch and fall out of 
-      method. Means the profiler does not 
-      */
-         NDC.inherit(this.parentNDC);
-         NDC.push(this.threadId.toString());
-         this.completionBarrier.await();// await all threads ready to start
-         recurse(this.recurseCount);
-         logger.info("Finished recursing. Awaiting on synch point.");
 
-         if (!this.completionBarrier.isBroken())
-         {// fall out of thread without waiting.
-            this.completionBarrier.await();// await all threads have finished
-         }
-      }
-      catch (Exception e)
-      {
-         logger.error(e.getMessage(), e);
-         this.completionBarrier.reset();
-         Assert.fail(e.getMessage());
-      }
-      catch (AssertionError ae)
-      {
-         this.completionBarrier.reset();
-         throw ae;
-      }
-      finally
-      {
-         NDC.clear();
-      }
-   }
-   
-   /**
-    * Run the test sequentially.
-    * 
-    * @param count
-    */
-   private void recurse(final int count)
-   {
-      for (int i = 1; i <= count; i += 1)
-      {
-         try
-         {
-            task(i);
-            logger.debug("Completed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
-            
-         } catch (Exception e)
-         {
-            this.completionBarrier.reset();
-            logger.error("Failed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
-            Assert.fail(e.getMessage(), e);
-         }
-      }
-   }
    public void task(final int iteration)
       throws Exception
    {
@@ -146,11 +59,9 @@
       transaction.registerSynchronization(new SessionSynchronization(emA, transaction, true));
       transaction.registerSynchronization(new SessionSynchronization( emB, transaction, true  ) );
       
-      //Object obj = emA.createNamedQuery("findUserById").setParameter("identifier", this.threadId).getSingleResult();
       session.set(emA);
       Object[] usrA = (Object[])findUsingResourceA(emA, readSQL);//discard details
       Number version = (Number ) usrA[12];
-      //logger.debug(usrA.toString());
       
       long time = System.currentTimeMillis();
       String writeSQLResA = String.format("update USERS set DEFAULT_BILLING_DETAILS_ID=%1$d, EMAIL='%2$s', FIRSTNAME='%3$s', HOME_CITY='%4$s', HOME_STREET='%5$s', HOME_ZIPCODE='%6$s', LASTNAME='%7$s', PASSWD='%8$s', RANK=%9$d, USERNAME='%10$s', OBJ_VERSION=%11$d where USER_ID=%12$d and OBJ_VERSION=%13$d", 
@@ -169,10 +80,13 @@
             null, "anotheruser at mail.tld", "Ben"+time, "Foocity", "Foostreet", "22222", "User", "secret", 0, "anotheruser110"+this.threadId, version.intValue()+1, this.threadId, version.intValue());
       
       writeUserB(emB, writeSQLResB);
-      //resourceAFlush(emA); flush not necessary
-      //resourceBFlush(emB); flush not necessary
       emA.getSession().getTransaction().commit();
       Assert.assertFalse(emA.isOpen());
+      boolean trip = false;
+      if (trip)
+      {
+         Profiler.
+      }
    }
    private void writeUserA(final EntityManager em, final String sql)
    {

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/jdbc/JDBCTest.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -26,6 +26,8 @@
 
 import org.apache.log4j.NDC;
 import org.jboss.jbossts.performance.MultiThreadedTest;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
 
 /**
  * This class creates an instance of the underlying task.
@@ -36,9 +38,9 @@
 public class JDBCTest extends MultiThreadedTest
 {
    @Override
-   public Runnable getTask(int recurseCount, CyclicBarrier completionBarrier, int threadId) throws Exception
+   public Runnable getTask(final MultithreadedTestConfiguration taskConfiguration) throws Exception
    {
-      return new JDBCTask(getEntityManagerFactory(), getEntityManagerFactoryB(), recurseCount, completionBarrier, threadId, this.isOptionalWriteEnabled, NDC.cloneStack());
+      return new JDBCTask(new ConcurrentTaskConfiguration(getEntityManagerFactory(), getEntityManagerFactoryB(), taskConfiguration.getIterationCount(), taskConfiguration.getCompletionBarrier(), this.isOptionalWriteEnabled, NDC.cloneStack(), taskConfiguration.getThreadIdentity()));
    }
    @Override
    protected boolean isMultiResourceTest()

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeResourcesTest.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,18 +22,23 @@
 
 package org.jboss.jbossts.performance.resource;
 
-import java.util.concurrent.CyclicBarrier;
-
 import org.apache.log4j.NDC;
 import org.jboss.jbossts.performance.MultiThreadedTest;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.configuration.MultithreadedTestConfiguration;
 
+/**
+ * This is in all intents and purposes has ended up being a factory.
+ * @author whitingjr
+ *
+ */
 public class SynchronizeResourcesTest extends MultiThreadedTest
 {
    @Override
-   public Runnable getTask(final int recurseCount, final CyclicBarrier completionBarrier, final int id)
+   public Runnable getTask(final MultithreadedTestConfiguration taskConfiguration)
       throws Exception
    {
-      return new SynchronizeTask(getEntityManagerFactory(), getEntityManagerFactoryB(), recurseCount, completionBarrier, id, this.isOptionalWriteEnabled, NDC.cloneStack());
+      return new SynchronizeTask(new ConcurrentTaskConfiguration( getEntityManagerFactory(), getEntityManagerFactoryB(), taskConfiguration.getIterationCount(), taskConfiguration.getCompletionBarrier(),  this.isOptionalWriteEnabled, NDC.cloneStack(), taskConfiguration.getThreadIdentity()));
    }
    @Override
    protected boolean isMultiResourceTest()

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/resource/SynchronizeTask.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -1,18 +1,15 @@
 package org.jboss.jbossts.performance.resource;
 
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-
 import javax.persistence.EntityManager;
 import javax.persistence.EntityManagerFactory;
 import javax.persistence.Query;
 import javax.transaction.Synchronization;
 
 import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
 import org.hibernate.Transaction;
 import org.hibernate.ejb.EntityManagerImpl;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.task.RecursiveTask;
 import org.jboss.tm.TransactionLocal;
 import org.testng.Assert;
 
@@ -21,82 +18,30 @@
 
 
 /**
- * This is the task that will.
- * A SynchronizeTask.
+ * This is the task that will read and write changes to two separate databases. 
+ * The implementation uses an ORM to handle the find and write operations. JTA
+ * is used to co-ordinate a distributed transaction.
+ * The implementation is a little clunky but it does what's necessary. The requirement
+ * was to avoid depending on using an EJB container and use the Hibernate APIs and internals
+ * to switch between native and xa functionality without code changes.
  * 
  * @author <a href="jwhiting at redhat.com">Jeremy Whiting</a>
  * @version $Revision: 1.1 $
  */
-public class SynchronizeTask implements Runnable
+public class SynchronizeTask extends RecursiveTask
 {
-   private final EntityManagerFactory emfA;
-   private final EntityManagerFactory emfB;
-   private final boolean isOptionalWriteEnabled;
-   private final int recurseCount;
-   private final static Logger logger = Logger.getLogger(SynchronizeTask.class);
-   private final CyclicBarrier completionBarrier;
-   private final Long threadId;
-   private final CountDownLatch latch;
-   private final Stack parentNDC;
-   public SynchronizeTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, Stack stack)
-   {
-      this.emfA = factoryA;
-      this.emfB = factoryB;
-      this.recurseCount = count;
-      this.completionBarrier = barrier;
-      this.threadId = new Long(id);
-      this.isOptionalWriteEnabled = isWEnabled;
-      this.latch = null;
-      this.parentNDC = stack;
-   }
    
-   public SynchronizeTask(final EntityManagerFactory factoryA, final EntityManagerFactory factoryB, final int count, final CyclicBarrier barrier, final int id, final boolean isWEnabled, final CountDownLatch countDown, Stack stack)
+   public SynchronizeTask(final ConcurrentTaskConfiguration taskConfiguration)
    {
-      this.emfA = factoryA;
-      this.emfB = factoryB;
-      this.recurseCount = count;
-      this.completionBarrier = barrier;
-      this.threadId = new Long(id);
-      this.isOptionalWriteEnabled = isWEnabled;
-      this.latch = countDown;
-      this.parentNDC = stack;
+      super(taskConfiguration);
    }
-   @Override
-   public void run()
-   {
-      try
-      {/* use the barrier to wait for all concurrent threads, when finished decrement the latch and fall out of 
-      method. 
-      */
-         NDC.inherit(this.parentNDC);
-         NDC.push(this.threadId.toString());
-         this.completionBarrier.await();// await all threads ready to start
-         recurse(this.recurseCount);
-         logger.info("Finished recursing. Awaiting on synch point.");
 
-         if (!this.completionBarrier.isBroken())
-         {// fall out of thread without waiting.
-            this.completionBarrier.await();// await all threads have finished
-         }
-      }
-      catch (Exception e)
-      {
-         logger.error(e.getMessage(), e);
-         this.completionBarrier.reset();
-         Assert.fail(e.getMessage());
-      }
-      catch (AssertionError ae)
-      {
-         this.completionBarrier.reset();
-         throw ae;
-      }
-      finally
-      {
-         NDC.clear();
-      }
-   }
-   
-   public void task()
+   /**
+    * This is the task which performs a read and (optional) write to the resource.
+    * Uses ORM to handle entity bean modification and persistence of changes.
+    */
+   @Override
+   public void task(final int iteration)
       throws Exception
    {
       EntityManagerImpl emA = null;
@@ -122,7 +67,6 @@
       userDAO_B.setEntityManagerB(emB);
 
       // Prepare a user object
-      //User user = userDAO_A.findById(this.threadId, false);
       session.set(emA);
       User userA = findUsingResourceA(userDAO_A);
 
@@ -142,31 +86,8 @@
       
       resourceAFlush(emA);
       resourceBFlush(emB);
-      //emA.getSession().setReadOnly(user, true);
       emA.getSession().getTransaction().commit();
    }
-   /**
-    * Run the test sequentially.
-    * 
-    * @param count
-    */
-   private void recurse(final int count)
-   {
-      for (int i = 0; i < count; i += 1)
-      {
-         try
-         {
-            task();
-            logger.debug("Completed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
-            
-         } catch (Exception e)
-         {
-            this.completionBarrier.reset();
-            logger.error("Failed profiled run of testbody for thread ["+this.threadId.toString()+"] count ["+i+"].");
-            Assert.fail(e.getMessage(), e);
-         }
-      }
-   }
    
    public EntityManagerFactory getEMFactoryA()
    {
@@ -178,6 +99,11 @@
       return this.emfB;
    }
    
+   /**
+    * This class provides a synchronziation of the second resource when the first commits.
+    * @author whitingjr
+    *
+    */
    public class SessionSynchronization implements Synchronization
    {
       private final Logger logger = Logger.getLogger(SessionSynchronization.class);
@@ -192,6 +118,7 @@
 
       public void beforeCompletion()
       {
+//         this.manager.flush();
       }
 
       public void afterCompletion(int status)

Modified: labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java
===================================================================
--- labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java	2010-02-05 08:41:58 UTC (rev 31421)
+++ labs/jbosstm/workspace/whitingjr/trunk/performance/src/test/java/org/jboss/jbossts/performance/task/RecursiveTask.java	2010-02-05 08:43:17 UTC (rev 31422)
@@ -22,11 +22,105 @@
 
 package org.jboss.jbossts.performance.task;
 
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+
+import javax.persistence.EntityManagerFactory;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+import org.jboss.jbossts.performance.configuration.ConcurrentTaskConfiguration;
+import org.jboss.jbossts.performance.resource.SynchronizeTask;
+import org.testng.Assert;
+
 /**
  * The purpose of this object is to provide recursive behaviour for a test case task.
- * Repeatedly call the 
+ * Repeatedly calling the body of a test case. The test case method is task rather than
+ * using the @Test annotation. 
+ * This class also provides disabling of profiling for a thread. This means profiling
+ * 
  */
-public class RecursiveTask
+public abstract class RecursiveTask implements Runnable
 {
+   protected final EntityManagerFactory emfA;
+   protected final EntityManagerFactory emfB;
+   protected final boolean isOptionalWriteEnabled;
+   private final int recurseCount;
+   private final static Logger logger = Logger.getLogger(SynchronizeTask.class);
+   private final CyclicBarrier completionBarrier;
+   //private final CountDownLatch latch;
+   protected final Long threadId;
+   private final Stack parentNDC;
+   private final ConcurrentTaskConfiguration taskConfiguration;
+   public RecursiveTask(final ConcurrentTaskConfiguration configuration)
+   {
+      this.emfA = configuration.getEmfactoryA();
+      this.emfB = configuration.getEmfactoryB();
+      this.recurseCount = configuration.getIterationCount();
+      this.completionBarrier = configuration.getCompletionBarrier();
+      this.isOptionalWriteEnabled = configuration.isOptionalWriteEnabled();
+      //this.latch = null;
+      this.parentNDC = configuration.getNDCstack();
+      this.threadId = configuration.getThreadIdentity();
+      this.taskConfiguration = configuration;
+   }
+   @Override
+   public void run()
+   {
+      try
+      {/* use the barrier to wait for all concurrent threads, when finished decrement the latch and fall out of 
+      method. 
+      */
+         NDC.inherit(this.parentNDC);
+         NDC.push(this.threadId.toString());
+         this.completionBarrier.await();// await all threads ready to start
+         recurse(this.recurseCount);
+         logger.info("Finished recursing. Awaiting on synch point.");
 
+         if (!this.completionBarrier.isBroken())
+         {// fall out of thread without waiting.
+            this.completionBarrier.await();// await all threads have finished
+         }
+      }
+      catch (Exception e)
+      {
+         logger.error(e.getMessage(), e);
+         this.completionBarrier.reset();
+         Assert.fail(e.getMessage());
+      }
+      catch (AssertionError ae)
+      {
+         this.completionBarrier.reset();
+         throw ae;
+      }
+      finally
+      {
+         NDC.clear();
+      }
+   }
+   protected abstract void task(final int iteration) throws Exception;
+   
+   /**
+    * Run the test sequentially.
+    * 
+    * @param count
+    */
+   private void recurse(final int count)
+   {
+      for (int i = 0; i < count; i += 1)
+      {
+         try
+         {
+            task(i);
+            logger.debug(String.format("Completed profiled run of testbody for thread [%1$d] count [%2$d].", this.threadId, i));
+            
+         } catch (Exception e)
+         {
+            this.completionBarrier.reset();
+            logger.error(String.format("Failed profiled run of testbody for thread [%1$d] count [$2$d].", this.threadId, i ));
+            Assert.fail(e.getMessage(), e);
+         }
+      }
+   }
 }



More information about the jboss-svn-commits mailing list