[hornetq-commits] JBoss hornetq SVN: r9123 - in trunk: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Apr 15 08:35:23 EDT 2010


Author: jmesnil
Date: 2010-04-15 08:35:22 -0400 (Thu, 15 Apr 2010)
New Revision: 9123

Modified:
   trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
   trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-287: cannot stop JMSBridge which is handling startup failure

* use an executor to run the failure handler thread
* remove 10K-iteration test() from JMSBridgeReconnectionTest


Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-04-15 10:38:36 UTC (rev 9122)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-04-15 12:35:22 UTC (rev 9123)
@@ -19,6 +19,9 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -50,6 +53,7 @@
 import org.hornetq.jms.bridge.JMSBridgeControl;
 import org.hornetq.jms.bridge.QualityOfServiceMode;
 import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.utils.Future;
 
 /**
  * 
@@ -63,8 +67,6 @@
  */
 public class JMSBridgeImpl implements HornetQComponent, JMSBridge
 {
-   public static final String FAILURE_HANDLER_THREAD_NAME = "jmsbridge-failurehandler-thread";
-
    private static final Logger log;
 
    private static boolean trace;
@@ -162,14 +164,14 @@
 
    private ObjectName objectName;
 
-   private Thread startupFailureThread;
-
    private static final int FORWARD_MODE_XA = 0;
 
    private static final int FORWARD_MODE_LOCALTX = 1;
 
    private static final int FORWARD_MODE_NONTX = 2;
 
+   private ExecutorService executor;
+   
    /*
     * Constructor for MBean
     */
@@ -276,6 +278,8 @@
 
       checkParams();
 
+      this.executor = Executors.newSingleThreadExecutor();
+      
       if (mbeanServer != null)
       {
          if (objectName != null)
@@ -364,20 +368,19 @@
 
             timeChecker = new BatchTimeChecker();
 
-            checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
-
+            Thread checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
+            checkerThread.start();
+            
             batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
 
-            checkerThread.start();
-
             if (JMSBridgeImpl.trace)
             {
                JMSBridgeImpl.log.trace("Started time checker thread");
             }
          }
 
-         sourceReceiver = new SourceReceiver();
-         sourceReceiver.start();
+         Thread sourceThread = new SourceReceiver();
+         sourceThread.start();
 
          if (JMSBridgeImpl.trace)
          {
@@ -415,12 +418,14 @@
          {
             sourceReceiver.interrupt();
          }
+
+         executor.shutdown();
+         boolean ok = executor.awaitTermination(2 * failureRetryInterval, TimeUnit.MILLISECONDS);
          
-         if (startupFailureThread != null)
+         if (!ok)
          {
-            startupFailureThread.interrupt();
+            log.warn("Timed out waiting to stop");
          }
-         
       }
 
       // This must be outside sync block
@@ -454,22 +459,6 @@
             JMSBridgeImpl.log.trace("Source receiver thread has finished");
          }
       }
-
-      // This must be outside sync block
-      if (startupFailureThread != null)
-      {
-         if (JMSBridgeImpl.trace)
-         {
-            JMSBridgeImpl.log.trace("Waiting for failure thread to finish");
-         }
-
-         startupFailureThread.join();
-
-         if (JMSBridgeImpl.trace)
-         {
-            JMSBridgeImpl.log.trace("Failure thread has finished");
-         }
-      }
       
       if (tx != null)
       {
@@ -1637,10 +1626,10 @@
 
    private void handleFailureOnStartup()
    {
-      startupFailureThread = handleFailure(new StartupFailureHandler());
+      handleFailure(new StartupFailureHandler());
    }
 
-   private Thread handleFailure(final Runnable failureHandler)
+   private void handleFailure(final Runnable failureHandler)
    {
       failed = true;
 
@@ -1648,11 +1637,7 @@
       // In the case of onMessage we can't close the connection from inside the onMessage method
       // since it will block waiting for onMessage to complete. In the case of start we want to return
       // from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
-      Thread t = new Thread(failureHandler, FAILURE_HANDLER_THREAD_NAME);
-
-      t.start();
-      
-      return t;
+      executor.execute(new Thread(failureHandler, "jmsbridge-thread-pool"));
    }
 
    private void addMessageIDInHeader(final Message msg) throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2010-04-15 10:38:36 UTC (rev 9122)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2010-04-15 12:35:22 UTC (rev 9123)
@@ -12,11 +12,8 @@
  */
 package org.hornetq.tests.integration.jms.bridge;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
-import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -145,19 +142,6 @@
       }
    }
    
-   public void test() throws Exception
-   {
-      for (int i = 0; i < 1000000; i++)
-      {
-         log.info("** ITER "+  i);
-         
-         this.testStopBridgeWithFailureWhenStarted();
-         tearDown();
-         
-         setUp();
-      }
-   }
-
    /**
     * https://jira.jboss.org/jira/browse/HORNETQ-287
     */
@@ -188,14 +172,13 @@
       Assert.assertFalse(bridge.isStarted());
       Assert.assertTrue(bridge.isFailed());
 
-      assertTrue(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
+      assertEquals(1, numOfThreadsStartingWith("pool-"));
       
       bridge.stop();
-
-      assertFalse(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
-
       Assert.assertFalse(bridge.isStarted());
 
+      assertEquals(0, numOfThreadsStartingWith("pool-"));
+      
       // we restart and setup the server for the test's tearDown checks
       jmsServer1.start();
       createQueue("targetQueue", 1);
@@ -203,18 +186,23 @@
 
    }
    
-   private boolean threadExists(String threadName)
+   //TODO is there a better way to check if a thread is still running?
+   private int numOfThreadsStartingWith(String prefix)
    {
+      int count = 0;
       long[] threadIds = ManagementFactory.getThreadMXBean().getAllThreadIds();
       for (long id : threadIds)
       {
          ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(id);
-         if (threadInfo.getThreadName().equals(threadName))
+         if (threadInfo != null)
          {
-            return true;
+            if (threadInfo.getThreadName().startsWith(prefix))
+            {
+               count++;
+            }
          }
       }
-      return false;
+      return count;
    }
    
    /*



More information about the hornetq-commits mailing list