[jboss-cvs] JBoss Messaging SVN: r4455 - in branches/Branch_Experimental_JBMESSAGING_1356: tests/src/org/jboss/test/messaging/util and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 12 16:37:30 EDT 2008


Author: ovidiu.feodorov at jboss.com
Date: 2008-06-12 16:37:29 -0400 (Thu, 12 Jun 2008)
New Revision: 4455

Modified:
   branches/Branch_Experimental_JBMESSAGING_1356/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
   branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
   branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1356: finished the implemenation of SerialExecutor.shutdownNow() for all implementing classes

Modified: branches/Branch_Experimental_JBMESSAGING_1356/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356/src/main/org/jboss/messaging/util/PooledSerialExecutor.java	2008-06-12 20:16:23 UTC (rev 4454)
+++ branches/Branch_Experimental_JBMESSAGING_1356/src/main/org/jboss/messaging/util/PooledSerialExecutor.java	2008-06-12 20:37:29 UTC (rev 4455)
@@ -37,6 +37,7 @@
     private PooledExecutor delegate;
     private boolean taskExecuting;
     private volatile Thread currentlyExecuting;
+    private boolean shuttingDown;
 
     // Constructors --------------------------------------------------------------------------------
 
@@ -99,10 +100,31 @@
         }
     }
 
-    public void shutdownNow()
+    public synchronized void shutdownNow()
     {
-        // thread.interrupt
-        throw new RuntimeException("NOT YET IMPLEMENTED");
+        if (shuttingDown)
+        {
+            return;
+        }
+
+        shuttingDown = true;
+
+        if (taskExecuting && currentlyExecuting != null)
+        {
+            currentlyExecuting.interrupt();
+        }
+
+        // clear the rest of the tasks
+        try
+        {
+            while(taskQueue.poll(0) != null)
+            {
+            }
+        }
+        catch(InterruptedException e)
+        {
+            log.warn(e);
+        }
     }
 
     public Thread getExecutorThread()
@@ -180,6 +202,11 @@
                     taskExecuting = false;
                     currentlyExecuting = null;
 
+                    if (shuttingDown)
+                    {
+                        return;
+                    }
+
                     while(true)
                     {
                         try

Modified: branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java	2008-06-12 20:16:23 UTC (rev 4454)
+++ branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java	2008-06-12 20:37:29 UTC (rev 4455)
@@ -120,7 +120,7 @@
 
     public void testStackOverflow() throws Exception
     {
-        fail("I know already this fails, fix it");
+        fail("I KNOW THIS TEST FAILS, MUST FIX THE IMPLEMENATION");
 
         // flood a one-thread executor with tasks
         SerialExecutor se = new PooledSerialExecutor(100000, 1);

Modified: branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java	2008-06-12 20:16:23 UTC (rev 4454)
+++ branches/Branch_Experimental_JBMESSAGING_1356/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java	2008-06-12 20:37:29 UTC (rev 4455)
@@ -311,9 +311,35 @@
 
     public void testShutdownNow() throws Exception
     {
-        throw new Exception("NOT YET IMPLEMENTED");
+        SerialExecutor se = getSerialExecutorToTest();
+
+        se.shutdownNow();
+
+        // make sure no new tasks go through
+        Task a = new Task("A");
+        se.execute(a);
+        assertFalse(a.isCompleted(1000));
     }
 
+    public void testShutdownNow2() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+
+        Task a = new Task("A");
+        se.execute(a);
+
+        Thread.sleep(300);
+
+        se.shutdownNow();
+
+        assertFalse(a.isCompleted(300));
+
+        // make sure no new tasks go through
+        Task b = new Task("B");
+        se.execute(b);
+        assertFalse(b.isCompleted(500));
+    }
+
     public void testGetExecutorThread() throws Exception
     {
         SerialExecutor se = getSerialExecutorToTest();
@@ -461,19 +487,15 @@
 
         public void run()
         {
-
-            while(true)
+            try
             {
-                try
-                {
-                    executionEnabledLatch.acquire();
-                    break;
-                }
-                catch(InterruptedException e)
-                {
-                    log.debug(e);
-                }
+                executionEnabledLatch.acquire();
             }
+            catch(InterruptedException e)
+            {
+                log.warn("interrupted when acquiring executionEnabledLatch, exiting", e);
+                return;
+            }
 
             log.info(this + " executing ...");
 
@@ -484,18 +506,15 @@
                 executionRecord.add(getName());
             }
 
-            while(true)
+            try
             {
-                try
-                {
-                    executionCompletionEnableddLatch.acquire();
-                    break;
-                }
-                catch(InterruptedException e)
-                {
-                    log.warn("failed to acquire executionCompletionEnableddLatch, retrying ...", e);
-                }
+                executionCompletionEnableddLatch.acquire();
             }
+            catch(InterruptedException e)
+            {
+                log.warn("interrupted when acquiring executionCompletionEnableddLatch, exiting", e);
+                return;
+            }
 
             executionCompletedLatch.release();
         }




More information about the jboss-cvs-commits mailing list