[jboss-svn-commits] JBL Code SVN: r26908 - labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Jun 10 12:22:55 EDT 2009


Author: adinn
Date: 2009-06-10 12:22:55 -0400 (Wed, 10 Jun 2009)
New Revision: 26908

Added:
   labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskReaper.java
Modified:
   labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/Task.java
   labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskImpl.java
   labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TestGroupBase.java
   labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/XML2JUnit.java
Log:
added a task reaper and modified task implementation to register and
deregister tasks as they come and go and otherwise play ball as regards
termination of the task

added logging of task process stdout/stderr to files -- output goes to

  ./testoutput/threadgroup/testid/xxxn_output.txt

where xxxn is e.g. client0, server1, etc and lines are prefixed with
either "err: " or "out: "

the common server task server0 started in the @SetUp method currently
logs its output to

  ./testoutput/threadgroup/server0_output.txt

and consequently gets overwritten at each test.

defaulted task timeout to 600 in code generator. really it needs to be
set using the timeout specified in the test definition.


Modified: labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/Task.java
===================================================================
--- labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/Task.java	2009-06-10 13:45:53 UTC (rev 26907)
+++ labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/Task.java	2009-06-10 16:22:55 UTC (rev 26908)
@@ -29,15 +29,41 @@
 {
     public enum TaskType { EXPECT_PASS_FAIL, EXPECT_READY }
 
+    /**
+     * execute a type READY task in a subprocess passing no arguments to the Main method of the
+     * implementing class then wait for the subprocess to exit.
+     */
     public void perform();
 
+    /**
+     * execute a type READY task in a subprocess passing the supplied arguments to the Main method
+     * of the implementing class then wait for the subprocess to exit.
+     * @param params arguments to supply to the main method of the implementing class
+     */
     public void perform(String... params);
 
+    /**
+     * execute a type PASS_FAIL or type READY task asynchronously in a subprocess passing no arguments to the Main
+     * method of the implementing class. if the task type is READY do not return until it has printed Ready.
+     */
     public void start();
 
+    /**
+     * execute a type PASS_FAIL or type READY task asynchronously in a subprocess passing the supplied arguments to
+     * the Main method of the implementing class. if the task type is READY do not return until it has printed
+     * Ready.
+     * @param params arguments to supply to the main method of the implementing class
+     */
     public void start(String... params);
 
+    /**
+     * check that a type PASS_FAIL task which was started asynchronously has printed either Passed or Failed
+     * and exited cleanly, waiting if it has not yet completed and asserting if either condition fails.
+     */
     public void waitFor();
 
+    /**
+     * terminate a type READY task
+     */
     public void terminate();
 }

Modified: labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskImpl.java
===================================================================
--- labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskImpl.java	2009-06-10 13:45:53 UTC (rev 26907)
+++ labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskImpl.java	2009-06-10 16:22:55 UTC (rev 26908)
@@ -24,17 +24,21 @@
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.io.FileInputStream;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
+import java.io.*;
 
 /**
- * Implementation of the Task abstraction, essentially a test aware wrapper around a spawned process.
+ * Reimplementation of the Task abstraction. This version cooperates with a task reaper which times out
+ * tasks whose underlying subprocess has become wedged. This can be used to ensure that the thread running
+ * a junit tests does not get wedged by a badly behaving subprocess. The task reaper also provides a means
+ * to terminate any left over tasks during the test tearDown.
  *
  * @author Jonathan Halliday (jonathan.halliday at redhat.com) 2009-05
+ * @author Andrew Dinn (adinn at redhat.com) 2009-06
  */
 public class TaskImpl implements Task
 {
+    public static int DEFAULT_TIMEOUT_SECONDS = 600;
+
     private final static String PROPERTIES_FILE = "TaskImpl.properties";
 
     private static Properties properties;
@@ -47,59 +51,153 @@
         }
     }
 
-    private static final Set<TaskImpl> tasks = new HashSet();
+    /**
+     * a class implementing a main method which gets executed in a JVM running in a subprocess in order
+     * to perform this task
+     */
+    private Class clazz;
 
-    //////////////////
+    /**
+     * identifies whether the task is a test which is expected to print Passed or Failed or a service
+     * which is expected to print Ready.
+     */
+    private TaskType type;
 
-    private Class clazz;
-    private TaskType type;
-    private boolean isRunning;
+    /**
+     * a timeout in seconds from when the task is started after which the task should be destroyed
+     * automatically by the task reaper if it has nto yet already been destroyed
+     */
+    private int timeout;
+
+    /**
+     * A handle on the process which was created to execute the task
+     */
+    Process process;
+
+    /**
+     * tasks can only be started once. this flag gaurds against repeated start attempts
+     */
+    private boolean started;
+    /**
+     * flag identifying whether or not this task's subpricess has finished executing
+     */
     private boolean isDone;
-    private boolean isChecked;
-    Process process;
+    /**
+     * a flag which is set if the task's process times out and gets destroyed before the task's subprocess
+     * has finished executing.
+     */
+    private boolean isTimedOut;
+    /**
+     * an output stream to which the contents of the task's stdout and stderr are redirected.
+     */
+    private PrintStream out;
+    /**
+     * a thread which reads the task's merged stdout/stderr stream and identifies whether or not a Passed/Failed
+     * or a Ready message has been printed. the reader thread alos needs to write the output to a log file.
+     */
     TaskReaderThread taskReaderThread;
 
-    TaskImpl(Class clazz, Task.TaskType type)
+    /**
+     * a thread which reads the task's merged stdout/stderr stream and identifies whether or not a Passed/Failed
+     * or a Ready message has been printed. the reader thread alos needs to write the output to a log file.
+     */
+    TaskErrorReaderThread taskErrorReaderThread;
+
+    /**
+     * create a new task with the default timeout and with output directed to System.out
+     * @param clazz the task whose main method is to be executed in a JVM running in a subprocess
+     * @param type the type of the test either PASS_FAIL or READY
+     */
+    TaskImpl(Class clazz, TaskType type)
     {
+        this(clazz, type, System.out, DEFAULT_TIMEOUT_SECONDS);
+    }
+
+    /**
+     * create a new task
+     * @param clazz the task whose main method is to be executed in a JVM running in a subprocess
+     * @param type the type of the test either PASS_FAIL or READY
+     * @param out the output stream to which output from the task's process shoudl be redirected.
+     * @param timeout the timeout for the task in seconds
+     */
+    TaskImpl(Class clazz, TaskType type, PrintStream out, int timeout)
+    {
         if(clazz == null || type == null) {
             throw new ExceptionInInitializerError("TaskImpl()<ctor> params may not be null");
         }
 
         this.clazz = clazz;
         this.type = type;
+        this.timeout = timeout;
+        this.out = out;
+        this.started = false;
+        this.isDone = false;
+        this.isTimedOut = false;
+        this.taskReaderThread = null;
     }
 
+    /**
+     * execute a type READY task in a subprocess passing no arguments to the Main method of the
+     * implementing class then wait for the subprocess to exit.
+     */
     public void perform() {
         perform((String[])null);
     }
 
+    /**
+     * execute a type READY task in a subprocess passing the supplied arguments to the Main method
+     * of the implementing class then wait for the subprocess to exit.
+     * @param params arguments to supply to the main method of the implementing class
+     */
     public void perform(String... params)
     {
         if(type != TaskType.EXPECT_PASS_FAIL) {
             throw new RuntimeException("can't perform an EXPECT_READY task");
         }
 
-        if(isDone || isChecked || isRunning) {
-            throw new RuntimeException("invalid state");
-        }
+        boolean printedPassed = false;
+        boolean printedFailed = false;
 
         String[] command = assembleCommand(clazz.getCanonicalName(), params);
 
-        Assert.assertTrue(tasks.add(this));
-/*
-        int i = 0;
-        for(String string : command) {
-            System.out.println("["+i+"] "+string);
-            i++;
+        // cannot restart a task
+
+        synchronized(this) {
+            if(started) {
+                throw new RuntimeException("invalid state for perform");
+            }
+            // first make sure we can create a subprocess
+
+            try {
+                // process = Runtime.getRuntime().exec(command);
+                ProcessBuilder builder = new ProcessBuilder(command);
+                // !!!! we cannot do this as we cannot be sure the spawned task does not
+                // interleave its stdout and stderr streams. if it does then this messes up detection
+                // of Ready, Passed and Failed output lines since they may get mingled with lines
+                // from the error stream. !!!!
+                // redirect errors to stdout -- avoids getting wedged when error output stream becomes full
+                // builder.redirectErrorStream(true);
+                process = builder.start();
+            } catch (Exception e) {
+                Assert.fail(e.toString());
+            }
+
+            // ok, we have started so register with the task reaper -- need to synchronize so we can set
+            // started atomically
+
+            TaskReaper.getReaper().insert(this, timeout * 1000);
+
+            started = true;
         }
-*/
-        System.out.println(clazz.getName());
+        // this is a PASS_FAIL test and we need to wait for it to complete
+        try {
+            // create an error stream reader to merge error output into the output file
+            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+            taskErrorReaderThread = new TaskErrorReaderThread(bufferedReader, out, "err: ");
+            taskErrorReaderThread.start();
 
-        boolean printedPassed = false;
-        boolean printedFailed = false;
-        try {
-            process = Runtime.getRuntime().exec(command);
-            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            // now read stdout checking for passed or failed
+            bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
             String line;
             while((line = bufferedReader.readLine()) != null) {
                 if("Passed".equals(line)) {
@@ -108,130 +206,265 @@
                 if("Failed".equals(line)) {
                     printedFailed = true;
                 }
-                System.out.println("Line: "+line);
+                // need to redirect to file
+
+                out.println("out: " + line);
             }
+        } catch (Exception e) {
+            // if we fail here then the reaper task will clean up the thread
+            Assert.fail(e.toString());
+        }
 
+        // we cannot really protect properly against races here because process.waitFor takes its
+        // own lock before then suspending so there will always be a window here. luckily process.destroy()
+        // which gets called by the reaper is idempotent, meaning the only thing we risk is that the task
+        // reaper may call destroy and register a stalled thread even though we get through the waitFor
+        // call.
+        try {
             process.waitFor();
-            Assert.assertEquals(0, process.exitValue());
-
         } catch (Exception e) {
             Assert.fail(e.toString());
         }
 
+        // ok, now ensure we sort out the race between the reaper task and this one
+        synchronized(this) {
+            if (!isTimedOut) {
+                // we got through the waitFor and relocked this before we coudl be timed out so remove the
+                // task from the reaper list
+                TaskReaper.getReaper().remove(this);
+            }
+            // setting this will forestall any pending attempt to timeout this task
+            isDone = true;
+        }
+        // we barf if we didn't exit with status 0 or print Passed or Failed
 
-        Assert.assertTrue(tasks.remove(this));
-
+        Assert.assertEquals(0, process.exitValue());
         Assert.assertFalse(printedFailed);
         Assert.assertTrue(printedPassed);
 
-        isDone = true;
-        isChecked = true;
+        // clean exit --  hurrah!
     }
 
+    /**
+     * execute a type PASS_FAIL or type READY task asynchronously in a subprocess passing no arguments to the Main
+     * method of the implementing class. if the task type is READY do not return until it has printed Ready.
+     */
     public void start()
     {
         start((String[])null);
     }
 
+    /**
+     * execute a type PASS_FAIL or type READY task asynchronously in a subprocess passing the supplied arguments to
+     * the Main method of the implementing class. if the task type is READY do not return until it has printed
+     * Ready.
+     * @param params arguments to supply to the main method of the implementing class
+     */
     public void start(String... params)
     {
-        if(isDone || isChecked || isRunning) {
-            throw new RuntimeException("invalid state");
-        }
+        String[] command = assembleCommand(clazz.getCanonicalName(), params);
 
-        String[] command = assembleCommand(clazz.getCanonicalName(), params);
-/*
-        int i = 0;
-        for(String string : command) {
-            System.out.println(string+" ");
-            //System.out.println("["+i+"] "+string);
-            i++;
+        // cannot restart a task
+
+        synchronized(this) {
+            if(started) {
+                throw new RuntimeException("invalid state for start");
+            }
+
+            // first make sure we can create a subprocess
+            try {
+                // process = Runtime.getRuntime().exec(command);
+                ProcessBuilder builder = new ProcessBuilder(command);
+                // !!!! we cannot do this as we cannot be sure the spawned task does not
+                // interleave its stdout and stderr streams. if it does then this messes up detection
+                // of Ready, Passed and Failed output lines since they may get mingled with lines
+                // from the error stream. !!!!
+                // redirect errors to stdout -- avoids getting wedged when error output stream becomes full
+                // builder.redirectErrorStream(true);
+                process = builder.start();
+            } catch (Exception e) {
+                Assert.fail(e.toString());
+            }
+
+            TaskReaper.getReaper().insert(this, timeout * 1000);
+
+            started = true;
         }
-*/
-        Assert.assertTrue(tasks.add(this));
 
-        System.out.println(clazz.getName());
+        // set up threads to do the I/O processing
 
         try {
-            process = Runtime.getRuntime().exec(command);
+            // create an error stream reader to merge error output into the output file
+            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(process.getErrorStream()));
+            taskErrorReaderThread = new TaskErrorReaderThread(bufferedReader, out, "err: ");
+            taskErrorReaderThread.start();
 
-            taskReaderThread = new TaskReaderThread( new BufferedReader(new InputStreamReader(process.getInputStream())) );
+            bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            taskReaderThread = new TaskReaderThread( bufferedReader, out, "out: ");
             taskReaderThread.start();
 
-            // TODO deal with it printing Failed
-
-            TaskReaderThread stdErr = new TaskReaderThread( new BufferedReader(new InputStreamReader(process.getErrorStream())) );
-            stdErr.start(); // if we don't consume it the pipe fills up and the process blocks.
-
             if(type.equals(TaskType.EXPECT_READY)) {
                 taskReaderThread.blockingWaitForReady();
-                System.out.println("got ready");
+                // System.out.println("got ready");
             }
 
         } catch (Exception e) {
             Assert.fail(e.toString());
         }
-
-        isRunning = true;
     }
 
+    /**
+     * check that a type PASS_FAIL task which was started asynchronously has printed either Passed or Failed
+     * and exited cleanly, waiting if it has not yet completed and asserting if either condition fails.
+     */
     public void waitFor()
     {
-        if(isDone || isChecked || !isRunning) {
-            throw new RuntimeException("invalid state");
+        if(type.equals(TaskType.EXPECT_READY)) {
+            Assert.fail("should not waitFor EXPECT_READY tasks (use terminate)");
         }
 
-        Assert.assertTrue(tasks.remove(this));
+        synchronized(this) {
+            if (isDone || !started) {
+                throw new RuntimeException("invalid state for waitFor");
+            }
 
+            if (isTimedOut) {
+                throw new RuntimeException("wait for timed out task");
+            }
+        }
+
+        // we cannot really protect properly against races here because process.waitFor takes its
+        // own lock before then suspending so there will always be a window here. luckily process.destroy()
+        // which gets called by the reaper is idempotent, meaning the only thing we risk is that the task
+        // reaper may call destroy and register a stalled thread even though we get through the waitFor
+        // call.
+
         try {
             process.waitFor();
-        } catch(Exception e) {
+        } catch (Exception e) {
             Assert.fail(e.toString());
         }
+
+        // ok, now ensure we sort out the race between the reaper task and this one
+
+        synchronized(this) {
+            if (!isTimedOut) {
+                // we got through the waitFor and relocked this before we could be timed out so remove the
+                // task from the reaper list
+                TaskReaper.getReaper().remove(this);
+            }
+            // setting this will forestall any pending attempt to timeout this task
+            isDone = true;
+        }
+
+        // throw up if we didn't exit with exit code 0
         Assert.assertEquals(0, process.exitValue());
 
+        // the taskReaderThread will throw up if it did nto get a clean finish or get a Passed and no Failed
         taskReaderThread.checkIsFinishedCleanly();
+        taskReaderThread.checkPassFail();
 
-        if(type.equals(TaskType.EXPECT_PASS_FAIL)) {
-            taskReaderThread.checkPassFail();
-        } else {
-            Assert.fail("should not waitFor EXPECT_READY tasks (use terminate)");
-        }
-
-        isRunning = false;
-        isDone = true;
-        isChecked = true;
+        // clean exit --  hurrah!
     }
 
+    /**
+     * terminate a type READY task
+     */
     public void terminate()
     {
-        if(isDone || isChecked || !isRunning) {
-            throw new RuntimeException("invalid state");
+        if(type.equals(TaskType.EXPECT_PASS_FAIL)) {
+            Assert.fail("Should not terminate EXPECT_PASS_FAIL tasks (use waitFor)");
         }
 
-        Assert.assertTrue(tasks.remove(this));
+        synchronized(this) {
+            if (isDone || !started) {
+                throw new RuntimeException("invalid state for terminate");
+            }
 
-        try {
-            process.destroy();
-        } catch(Exception e) {
-            Assert.fail(e.toString());
+            if (isTimedOut) {
+                throw new RuntimeException("terminate for timed out task");
+            }
+
+            TaskReaper.getReaper().remove(this);
+
+            // setting this will forestall any pending attempt to timeout this task
+            isDone = true;
         }
 
+        if (taskReaderThread != null) {
+            taskErrorReaderThread.shutdown();
+        }
 
-        if(!type.equals(TaskType.EXPECT_READY)) {
-            Assert.fail("Should not terminate EXPECT_PASS_FAIL tasks (use waitFor)");
+        if (taskReaderThread != null) {
+            // tell the reader not to throw a wobbly before we destroy the process
+            taskReaderThread.shutdown();
         }
 
-        isDone = true;
-        isChecked = true;
+        process.destroy();
     }
 
+    /**
+     * ensure that any tasks started during the last test run are killed
+     */
+    public static void cleanupTasks()
+    {
+        TaskReaper.getReaper().clear();
+    }
+
     /////////////////////////
 
+    /**
+     * ensure that there are no tasks left running, clearing out all such tasks and asserting if this
+     * is not the case
+     */
     public static void assertNoTasks() {
-        Assert.assertEquals(0, tasks.size());
+        // Assert.assertEquals(0, tasks.size());
+        Assert.assertTrue(TaskReaper.getReaper().clear() == 0);
     }
 
+    /////////////////////////
+    // package public for use by TaskReaper and internally
+
+    /**
+     * destroy the subprocess associated with this task
+     * @return true if the process was timed out and false if it exited before we got there
+     */
+    boolean timeout()
+    {
+        synchronized(this) {
+            if (isDone) {
+                return false;
+            } else {
+                isTimedOut = true;
+            }
+
+            if (taskReaderThread != null) {
+                taskErrorReaderThread.shutdown();
+            }
+
+            if (taskReaderThread != null) {
+                // tell the reader not to throw a wobbly before we destroy the process
+                taskReaderThread.shutdown();
+            }
+
+            out.println("!!!TASK TIME OUT!!!");
+            // we timed out before the process managed to complete so kill it now
+            // n.b. since this closes the process stdout we can be sure that the task
+            // reader thread will exit.
+            process.destroy();
+        }
+        return true;
+    }
+    /////////////////////////
+    // private implementation
+
+    /**
+     * construct an execable command line to execute the supplied java class with the arguments in params,
+     * substituting those in the form $(...) with corresponding values derived from the properties file.
+     * @param classname
+     * @param params
+     * @return the command line as an array of strings
+     */
     private String[] assembleCommand(String classname, String[] params) {
         params = substituteParams(params);
 
@@ -259,6 +492,12 @@
         return list.toArray(new String[list.size()]);
     }
 
+    /**
+     * construct a copy of the supplied argument String array replacing all terms in the form $(...) with
+     * corresponding values derived from the properties file
+     * @param params the original arguments to be copied
+     * @return the substituted copy
+     */
     private String[] substituteParams(String[] params) {
         if(params == null || params.length == 0) {
             return null;
@@ -280,18 +519,30 @@
         return result;
     }
 
+    /**
+     * a thread created whenever a task is started asynchronously to forward output from the task's process
+     * to an output stream and to detect printing of Ready, Passed and Failed ouptut lines.
+     */
     private class TaskReaderThread extends Thread {
 
         BufferedReader bufferedReader;
+        PrintStream out;
+        private String prefix;
 
         private final AtomicBoolean printedReady = new AtomicBoolean(false);
         private final AtomicBoolean isFinishedCleanly = new AtomicBoolean(false);
         private volatile boolean printedPassed = false;
         private volatile boolean printedFailed = false;
+        private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
+        /**
+         * called by the test thread under Task.start() to ensure that a type READY task's process has printed a
+         * Ready output line before returning forom the start call.
+         */
         public void blockingWaitForReady() {
             synchronized (printedReady) {
-                while(!printedReady.get()) {
+                // test of shutdown ensures we exit this loop if we get destroyed on a timeout
+                while(!printedReady.get() && !shutdown.get()) {
                     try {
                         printedReady.wait();
                     } catch (InterruptedException e) {
@@ -299,8 +550,16 @@
                     }
                 }
             }
+
+            // make sure the test fails of we did not see ready
+
+            Assert.assertTrue(printedReady.get());
         }
 
+        /**
+         * called by the test thread to ensure that a type PASS_FAIL or READY task's process has exited and
+         * closed its output stream cleanly, thereby causing the reader thread to exit cleanly.
+         */
         public void checkIsFinishedCleanly() {
 
             try {
@@ -310,16 +569,28 @@
             }
 
             Assert.assertTrue(isFinishedCleanly.get());
-            Assert.assertFalse(printedFailed);
         }
 
+        /**
+         * called by the test thread to ensure that a type PASS_FAIL task's process has printed a Passed
+         * output line and has not printed a Failed output line.
+         */
         public void checkPassFail() {
             Assert.assertFalse(printedFailed);
             Assert.assertTrue(printedPassed);
         }
 
+        /**
+         * create a task reader thread defaulting the prefix to "Line: " and the output stream to
+         *
+         */
         public TaskReaderThread(BufferedReader bufferedReader) {
+            this(bufferedReader, System.out, "out: ");
+        }
+        public TaskReaderThread(BufferedReader bufferedReader, PrintStream out, String prefix) {
             this.bufferedReader = bufferedReader;
+            this.prefix = prefix;
+            this.out = out;
         }
 
         public void run() {
@@ -338,17 +609,74 @@
                     if("Failed".equals(line)) {
                         printedFailed = true;
                     }
-                    System.out.println("Line: "+line);
+                    out.println(prefix + line);
                 }
 
-                synchronized (isFinishedCleanly) {
-                    isFinishedCleanly.set(true);
-                    isFinishedCleanly.notify();
+                isFinishedCleanly.set(true);
+            } catch(Exception e) {
+                // make sure no one is waiting for a ready
+                synchronized(printedReady) {
+                    printedReady.notify();
                 }
+                
+                // if the process is explicitly destroyed we can see an IOException because the output
+                // stream gets closed under the call to readLine(). shutdown is set befofe destroy
+                // is called so only trace the exception if we are not shut down
+                if (shutdown.get()) {
+                    return;
+                }
+                out.println("TaskReaderThread : exception before shutdown " + e);
+                e.printStackTrace(out);
+            }
+        }
+
+        public void shutdown() {
+            shutdown.set(true);
+        }
+    }
+    /**
+     * a thread created whenever a task is started asynchronously to forward error output from the task's
+     * process to an output stream.
+     */
+    private class TaskErrorReaderThread extends Thread {
+
+        BufferedReader bufferedReader;
+        PrintStream out;
+        private String prefix;
+        private AtomicBoolean shutdown = new AtomicBoolean(false);
+
+        public TaskErrorReaderThread(BufferedReader bufferedReader)
+        {
+            this(bufferedReader, System.err, "err: ");
+        }
+
+        public TaskErrorReaderThread(BufferedReader bufferedReader, PrintStream out, String prefix)
+        {
+            this.bufferedReader = bufferedReader;
+            this.out = out;
+            this.prefix = prefix;
+        }
+
+        public void run() {
+            try {
+                String line;
+                while((line = bufferedReader.readLine()) != null) {
+                    out.println(prefix + line);
+                }
             } catch(Exception e) {
-                e.printStackTrace();
+                // if the process is explicitly destroyed we can see an IOException because the output
+                // stream gets closed under the call to readLine(). shutdown is set befofe destroy
+                // is called so only trace the exception if we are not shut down
+                if (shutdown.get()) {
+                    return;
+                }
+                out.println("TaskErrorReaderThread : exception before shutdown " + e);
+                e.printStackTrace(out);
             }
         }
+        
+        public void shutdown() {
+            shutdown.set(true);
+        }
     }
-
 }
\ No newline at end of file

Added: labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskReaper.java
===================================================================
--- labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskReaper.java	                        (rev 0)
+++ labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TaskReaper.java	2009-06-10 16:22:55 UTC (rev 26908)
@@ -0,0 +1,335 @@
+package org.jboss.jbossts.qa.junit;
+
+import java.util.SortedSet;
+import java.util.HashMap;
+import java.util.TreeSet;
+import java.util.Iterator;
+
+/**
+ * Task manager which ensures that processes created during unit tests are destroyed when they
+ * do no complete after a suitable timeout.
+ */
+public class TaskReaper
+{
+    // api methods
+    // n.b  all internal methods synchronize on the value in the reaperLock field when they need
+    // exclusive access to the internal state. note that the api methods are not synchronized but
+    // they should nto be called concurrently.
+
+    /**
+     * insert a task into the list of managed tasks
+     * @param task the task to be inserted
+     * @param absoluteTimeout the absolute system time measured in milliseconds from the epoch at which
+     * the task's process should be destroyed if it has not been removed from the list by then
+     */
+    public void insert(TaskImpl task, long absoluteTimeout)
+    {
+        synchronized(reaperLock) {
+            if (shutdown) {
+                throw new RuntimeException("invalid call to TaskReaper.insert after shutdown");
+            }
+            TaskReapable reapable = new TaskReapable(task, absoluteTimeout);
+            reapableMap.put(task, reapable);
+            taskList.add(reapable);
+            // notify the reaper thread
+            reaperLock.notify();
+        }
+    }
+
+    /**
+     * remove a task from the list of managed tasks
+     * @param task the task to be removed
+     * @return true if the task was present in the list and was removed before a timeout caused its
+     * process to be destroyed otherwise false
+     */
+    public boolean remove(TaskImpl task)
+    {
+        synchronized(reaperLock) {
+            TaskReapable reapable = reapableMap.get(task);
+            if (reapable != null) {
+                taskList.remove(reapable);
+                // notify the reaper thread
+                reaperLock.notify();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * check if there are any tasks in the list
+     * @return true if the list is empty otherwise false
+     */
+    public boolean allClear()
+    {
+        synchronized (reaperLock) {
+            return taskList.isEmpty();
+        }
+    }
+
+    /**
+     * remove any remaining tasks from the list, destroying their process, and return a count of the
+     * number of tasks which have failed to exit cleanly. this count includes tasks which have been killed
+     * because of timeouts as well as those destroyed under the call to clear().
+     * @return a count of how many tasks exited abnormally.
+
+     */
+    public int clear()
+    {
+        int returnCount;
+        synchronized(reaperLock) {
+            // set the absolute timeout of every task to zero then wake up the reaper and wait for
+            // the list to empty
+            Iterator<TaskReapable> iterator = taskList.iterator();
+            while (iterator.hasNext()) {
+                TaskReapable reapable = iterator.next();
+                reapable.absoluteTimeout = 0;
+            }
+            reaperLock.notify();
+            // ok now wait until all the tasks have gone
+            // n.b this relies upon the caller not inserting new tasks while the clear operation is in progress!
+            while (!taskList.isEmpty()) {
+                try {
+                    reaperLock.wait();
+                } catch (InterruptedException e) {
+                    // ignore -- we should never be interrupted here
+                }
+            }
+            returnCount = invalidCount;
+            invalidCount = 0;
+        }
+        return returnCount;
+    }
+
+    /**
+     * shut down the task manager
+     * @param immediate if true then shut down without destroying any task in the current list otherwise
+     * atempt to destroy all pending tasks.
+     */
+    public void shutdown(boolean immediate)
+    {
+        // unset the current reaper instance
+
+        clearTheReaper(this);
+
+        // now ensure that any tasks it had pending are removed or time out
+
+        synchronized (reaperLock) {
+            shutdown = false;
+            // setting shutdownWait to false makes the reaper thread exit without clearing the list
+            // setting it true makes it exit once all tasks have been destroyed
+            if (immediate) {
+                shutdownWait = false;
+            } else {
+                shutdownWait = true;
+                // set the absolute timeout of every task to zero then wake up the reaper and wait for
+                // the list to empty
+                Iterator<TaskReapable> iterator = taskList.iterator();
+                while (iterator.hasNext()) {
+                    TaskReapable reapable = iterator.next();
+                    reapable.absoluteTimeout = 0;
+                }
+            }
+
+            // notify so that the reaper thread wakes up
+
+            reaperLock.notify();
+
+            // we don't get out of here until the reaper thread has exited
+            
+            while (!threadShutdown) {
+                try {
+                    reaperLock.wait();
+                } catch (InterruptedException e) {
+                    // ignore -- we should never be interrupted here
+                }
+            }
+        }
+    }
+
+    /**
+     * obtain a handle on the currently active reaper, creating a new one if there is no reaper active
+     * @return
+     */
+    public static synchronized TaskReaper getReaper()
+    {
+        if (theReaper == null) {
+            createReaper();
+        }
+
+        return theReaper;
+    }
+
+    /**
+     * reset the current reaper instance to null. this is called from the current reaper instanmce's shutdown
+     * method to reset the current handle to null.
+     */
+    private static synchronized void clearTheReaper(TaskReaper theReaperReaped)
+    {
+        // if the current reaper still identifies the one we just shutdown then reset it to null
+
+        if (theReaper == theReaperReaped) {
+            theReaper = null;
+        }
+    }
+
+    // implementation methods and state
+
+    // package public access only for use by TaskReaperThread
+
+    /**
+     * entry point for the task reaper thread to detect timed out tasks in the background. this should not
+     * be called anywhere except in TaskReaperThread.run
+     * @return
+     */
+    void check()
+    {
+        synchronized(reaperLock) {
+            while (!shutdown || shutdownWait) {
+                if (taskList.isEmpty()) {
+                    // wait as long as we need to
+                    try {
+                        reaperLock.wait();
+                    } catch (InterruptedException e) {
+                        // ignore -- we should never be interrupted here
+                    }
+                } else {
+                    TaskReapable first = taskList.first();
+                    long absoluteTime = System.currentTimeMillis();
+                    long firstAbsoluteTime = first.getAbsoluteTimeout();
+                    if (absoluteTime < firstAbsoluteTime) {
+                        // use difference to limit wait
+                        try {
+                            reaperLock.wait(firstAbsoluteTime - absoluteTime);
+                        } catch (InterruptedException e) {
+                            // ignore -- we should never be interrupted here
+                        }
+                    } else {
+                        // we have a task to kill so kill it, wait a brief interval so we don't hog
+                        // the cpu and then loop to see if there are more to kill
+                        if (timeout(first)) {
+                            invalidCount++;
+                        }
+                        // notify here in case a thread was trying to modify the list while we were doing
+                        // the timeout
+                        reaperLock.notify();
+                        try {
+                            reaperLock.wait(1);
+                        } catch (InterruptedException e) {
+                            // ignore -- we should never be interrupted here
+                        }
+                    }
+                }
+            }
+            threadShutdown = true;
+            // notify here so we wakeup the thread which initiated the shutdown
+            reaperLock.notify();
+        }
+    }
+
+    private static TaskReaper theReaper = null;
+    private SortedSet<TaskReapable> taskList;
+    private HashMap<TaskImpl, TaskReapable> reapableMap;
+    private int invalidCount;
+    private boolean shutdown;
+    private boolean shutdownWait;
+    private boolean threadShutdown;
+    private Object reaperLock;    
+    private TaskReaperThread reaperThread;
+
+    private TaskReaper()
+    {
+        taskList = new TreeSet<TaskReapable>();
+        reapableMap = new HashMap<TaskImpl, TaskReapable>();
+        invalidCount = 0;
+        shutdown = false;
+        shutdownWait = false;
+        threadShutdown = false;
+        reaperLock = new Object();
+        reaperThread = new TaskReaperThread(this);
+        reaperThread.start();
+    }
+
+    /**
+     * start the task manager
+     */
+    private static void createReaper()
+    {
+        theReaper = new TaskReaper();
+    }
+
+    /**
+     * destroy a timed out task and remove it from the task list. n.b. this must be called when
+     * synchronized on the reaper lock
+     * @param reapable the task to be destroyed
+     * @return true if the task exited invalidly otherwise false
+     */
+    private boolean timeout(TaskReapable reapable)
+    {
+        TaskImpl task = reapable.getTask();
+        reapableMap.remove(task);
+        taskList.remove(reapable);
+        return reapable.getTask().timeout();
+    }
+
+    /**
+     * wrapper which associates a task with its absoulte timeout and provides a comparator which allows
+     * tasks to be sorted in order of absolute timeout
+     */
+    private static class TaskReapable implements Comparable<TaskReapable>
+    {
+        public TaskReapable(TaskImpl task, long absoluteTimeout)
+        {
+            long now = System.currentTimeMillis();
+            this.absoluteTimeout = now + absoluteTimeout;
+            this.task = task;
+        }
+
+        public long getAbsoluteTimeout()
+        {
+            return absoluteTimeout;
+        }
+
+        public TaskImpl getTask() {
+            return task;
+        }
+
+        private long absoluteTimeout;
+        private TaskImpl task;
+
+        public int compareTo(TaskReapable o) {
+            if (this == o) {
+                return 0;
+            }
+
+            if (absoluteTimeout < o.absoluteTimeout) {
+                return -1;
+            } else if (absoluteTimeout > o.absoluteTimeout) {
+                return 1;
+            } else {
+                // try to sort using hash codes
+                int h = hashCode();
+                int oh = o.hashCode();
+                if (h < oh) {
+                    return -1;
+                } else {
+                    return 1;
+                }
+            }
+        }
+    }
+
+    private static class TaskReaperThread extends Thread
+    {
+        public TaskReaperThread(TaskReaper reaper)
+        {
+            this.reaper = reaper;
+        }
+        public void run()
+        {
+            reaper.check();
+        }
+
+        private TaskReaper reaper;
+    }
+}

Modified: labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TestGroupBase.java
===================================================================
--- labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TestGroupBase.java	2009-06-10 13:45:53 UTC (rev 26907)
+++ labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/TestGroupBase.java	2009-06-10 16:22:55 UTC (rev 26908)
@@ -22,7 +22,10 @@
 
 import org.junit.Before;
 import org.junit.After;
+import org.junit.Assert;
 
+import java.io.*;
+
 /**
  * Base class from which all autogenerated test suites inherit common behaviour.
  *
@@ -33,16 +36,38 @@
 
     @Before public void setUp()
 	{
-        TaskImpl.assertNoTasks();
+        // no need to do this here as it gets done in tearDown
+        // TaskImpl.cleanupTasks();
         // TODO EmptyObjectStore
 	}
 
 	@After public void tearDown()
 	{
-        TaskImpl.assertNoTasks();
+        TaskImpl.cleanupTasks();
     }
 
     public Task createTask(Class clazz, Task.TaskType taskType) {
         return new TaskImpl(clazz, taskType);
     }
+
+    public Task createTask(Class clazz, Task.TaskType taskType, String filename, int timeout) {
+        OutputStream out;
+        try {
+            File outFile = new File(filename);
+            if (outFile.isDirectory()) {
+                Assert.fail("createTask : output file name identifies directory " + filename);
+            }
+            File directory = outFile.getParentFile();
+            if (!directory.exists() && !directory.mkdirs()) {
+                Assert.fail("createTask : could not create directory for file " + filename);
+            }
+            out = new FileOutputStream(outFile);
+
+            return new TaskImpl(clazz, taskType, new PrintStream(out, true), timeout);
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail("createTask : could not open output stream for file " + filename);
+            return null;
+        }
+    }
 }

Modified: labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/XML2JUnit.java
===================================================================
--- labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/XML2JUnit.java	2009-06-10 13:45:53 UTC (rev 26907)
+++ labs/jbosstm/trunk/qa/tests/src/org/jboss/jbossts/qa/junit/XML2JUnit.java	2009-06-10 16:22:55 UTC (rev 26908)
@@ -141,13 +141,10 @@
         // EmptyObjectStore is in the test base class, don't generate for it
         for(TestDefinition testDefinition : testDefs) {
             ArrayList<Action> actionList = testDefinition.getActionList();
-            for(Action action : actionList) {
-                TaskDefinition taskDefinition = getTaskDef(actionList.get(0), testDefinition);
-                if(taskDefinition == null || !taskDefinition.getClassName().equals("org.jboss.jbossts.qa.Utils.EmptyObjectStore")) {
-                    throw new IllegalArgumentException("Test does not start with EmptyObjectStore");
-                }
+            TaskDefinition taskDefinition = getTaskDef(actionList.remove(0), testDefinition);
+            if(taskDefinition == null || !taskDefinition.getClassName().equals("org.jboss.jbossts.qa.Utils.EmptyObjectStore")) {
+                throw new IllegalArgumentException("Test does not start with EmptyObjectStore");
             }
-            actionList.remove(0);
         }
 
         int startingBufferPosition = buffer.length();
@@ -166,10 +163,15 @@
 
         buffer.append("\t at After public void tearDown()\n");
         buffer.append("\t{\n");
+        // do the per server terminates inside a try blockso we can guarantee to call super.tearDown in
+        // a finally block if any of them fails
+        buffer.append("\t\ttry {\n");
 
         generateCommonTasks(true, outstandingActions); // tearDown method
 
-        buffer.append("\t\tsuper.tearDown();\n");
+        buffer.append("\t\t} finally {\n");
+        buffer.append("\t\t\tsuper.tearDown();\n");
+        buffer.append("\t\t}\n");
         buffer.append("\t}\n\n");
     }
 
@@ -183,7 +185,7 @@
 
         if(terminationActions != null) {
             for(Action action : terminationActions) {
-                generateTask(action,testDefs.get(0), false);
+                generateTask(action,testDefs.get(0), true);
             }
         }
 
@@ -266,11 +268,16 @@
     public void generateTask(Action action, TestDefinition testDefinition, boolean isSetup) throws Exception {
 
         TaskDefinition taskDefinition = getTaskDef(action, testDefinition);
-
+        String outputDirectory;
+        String filename;
         switch(action.getType()) {
             case Action.PERFORM_TASK:
                 String name = (action.getAssociatedRuntimeTaskId() == null ? ("task"+(nameCount++)) : action.getAssociatedRuntimeTaskId());
-                buffer.append("\t\tTask "+name+" = createTask("+taskDefinition.getClassName()+".class, Task.TaskType."+taskDefinition.getTypeText()+");\n");
+                outputDirectory = testOutputDirectory(testDefinition, isSetup);
+                filename = testOutputFilename(testDefinition, name);
+                buffer.append("\t\tTask "+name+" = createTask(");
+                buffer.append(taskDefinition.getClassName()+".class, Task.TaskType."+taskDefinition.getTypeText());
+                buffer.append(", \"" + outputDirectory + filename + "\", 600);\n");
                 if(action.getParameterList().length != 0) {
                     buffer.append("\t\t"+name+".perform("); // new String[] {
 
@@ -294,7 +301,11 @@
                 } else {
                     buffer.append("\t\tTask ");
                 }
-                buffer.append(action.getAssociatedRuntimeTaskId()+" = createTask("+taskDefinition.getClassName()+".class, Task.TaskType."+taskDefinition.getTypeText()+");\n");
+                outputDirectory = testOutputDirectory(testDefinition, isSetup);
+                filename = testOutputFilename(testDefinition, action.getAssociatedRuntimeTaskId());
+                buffer.append(action.getAssociatedRuntimeTaskId() + " = createTask(");
+                buffer.append(taskDefinition.getClassName()+".class, Task.TaskType."+taskDefinition.getTypeText());
+                buffer.append(", \"" + outputDirectory + filename + "\", 600);\n");
                 if(action.getParameterList().length != 0) {
                     buffer.append("\t\t"+action.getAssociatedRuntimeTaskId()+".start("); // new String[] {
 
@@ -313,7 +324,11 @@
                 }
                 break;
             case Action.TERMINATE_TASK:
-                buffer.append("\t\t"+action.getAssociatedRuntimeTaskId()+".terminate();\n");
+                if (isSetup) {
+                    buffer.append("\t\t\t"+action.getAssociatedRuntimeTaskId()+".terminate();\n");
+                } else {
+                    buffer.append("\t\t"+action.getAssociatedRuntimeTaskId()+".terminate();\n");
+                }
                 break;
             case Action.WAIT_FOR_TASK:
                 buffer.append("\t\t"+action.getAssociatedRuntimeTaskId()+".waitFor();\n");
@@ -323,6 +338,29 @@
         }
     }
 
+    private String testOutputFilename(TestDefinition testDefinition, String name) {
+        // TODO - identify a better naming scheme
+        return name + "_output.txt";
+    }
+
+    private String testOutputDirectory(TestDefinition testDefinition, boolean isSetup) {
+        // TODO - identify a better naming scheme
+
+        String groupId = testDefinition.getGroupId();
+        if (isSetup) {
+            // setup tasks write their output in the parent directory
+            return "./testoutput/" + groupId + "/";
+        } else {
+            String testId = testDefinition.getId();
+
+            if (testId.startsWith(groupId + "_")) {
+                testId = testId.substring(groupId.length() + 1);
+            }
+
+            return "./testoutput/" + groupId + "/" + testId.replace("-", "_") + "/";
+        }
+    }
+
     public void generateTest(TestDefinition testDefinition) throws Exception {
 
         if(testDefinition == null) {




More information about the jboss-svn-commits mailing list