[hornetq-commits] JBoss hornetq SVN: r9119 - in trunk: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Apr 15 05:10:05 EDT 2010


Author: jmesnil
Date: 2010-04-15 05:10:04 -0400 (Thu, 15 Apr 2010)
New Revision: 9119

Modified:
   trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
   trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-287: cannot stop JMSBridge which is handling startup failure

* flag JMSBridge stopping attribute as volatile
* make sure the failure handler is stopped before JMSBridge.stop() returns
* fix test to check for the failure handler thread existence before/after calling JMSBrige.stop()

Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-04-14 18:35:54 UTC (rev 9118)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-04-15 09:10:04 UTC (rev 9119)
@@ -63,6 +63,8 @@
  */
 public class JMSBridgeImpl implements HornetQComponent, JMSBridge
 {
+   public static final String FAILURE_HANDLER_THREAD_NAME = "jmsbridge-failurehandler-thread";
+
    private static final Logger log;
 
    private static boolean trace;
@@ -108,7 +110,7 @@
 
    private boolean started;
 
-   private boolean stopping = false;
+   private volatile boolean stopping = false;
 
    private final LinkedList<Message> messages;
 
@@ -160,6 +162,8 @@
 
    private ObjectName objectName;
 
+   private Thread startupFailureThread;
+
    private static final int FORWARD_MODE_XA = 0;
 
    private static final int FORWARD_MODE_LOCALTX = 1;
@@ -392,12 +396,6 @@
    {
       stopping = true;
 
-      if (!started)
-      {
-         JMSBridgeImpl.log.warn("Attempt to stop, but is already stopped");
-         return;
-      }
-
       if (JMSBridgeImpl.trace)
       {
          JMSBridgeImpl.log.trace("Stopping " + this);
@@ -417,6 +415,12 @@
          {
             sourceReceiver.interrupt();
          }
+         
+         if (startupFailureThread != null)
+         {
+            startupFailureThread.interrupt();
+         }
+         
       }
 
       // This must be outside sync block
@@ -451,6 +455,22 @@
          }
       }
 
+      // This must be outside sync block
+      if (startupFailureThread != null)
+      {
+         if (JMSBridgeImpl.trace)
+         {
+            JMSBridgeImpl.log.trace("Waiting for failure thread to finish");
+         }
+
+         startupFailureThread.join();
+
+         if (JMSBridgeImpl.trace)
+         {
+            JMSBridgeImpl.log.trace("Failure thread has finished");
+         }
+      }
+      
       if (tx != null)
       {
          // Terminate any transaction
@@ -1617,10 +1637,10 @@
 
    private void handleFailureOnStartup()
    {
-      handleFailure(new StartupFailureHandler());
+      startupFailureThread = handleFailure(new StartupFailureHandler());
    }
 
-   private void handleFailure(final Runnable failureHandler)
+   private Thread handleFailure(final Runnable failureHandler)
    {
       failed = true;
 
@@ -1628,9 +1648,11 @@
       // In the case of onMessage we can't close the connection from inside the onMessage method
       // since it will block waiting for onMessage to complete. In the case of start we want to return
       // from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
-      Thread t = new Thread(failureHandler, "jmsbridge-failurehandler-thread");
+      Thread t = new Thread(failureHandler, FAILURE_HANDLER_THREAD_NAME);
 
       t.start();
+      
+      return t;
    }
 
    private void addMessageIDInHeader(final Message msg) throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2010-04-14 18:35:54 UTC (rev 9118)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2010-04-15 09:10:04 UTC (rev 9119)
@@ -12,7 +12,11 @@
  */
 package org.hornetq.tests.integration.jms.bridge;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.util.Map;
 
 import junit.framework.Assert;
 
@@ -148,8 +152,6 @@
    {
       jmsServer1.stop();
 
-      long failureRetryInterval = 500;
-      
       JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
                                                cff1,
                                                sourceQueueFactory,
@@ -173,13 +175,12 @@
       Assert.assertFalse(bridge.isStarted());
       Assert.assertTrue(bridge.isFailed());
 
-      int numThreads = ManagementFactory.getThreadMXBean().getThreadCount();
+      assertTrue(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
       
       bridge.stop();
-      Thread.sleep(failureRetryInterval * 2);
 
-      // the JMS Brigde failure handler thread must have been stopped at most 1 failureRetryInterval ms after the bridge is stopped
-      assertEquals(numThreads - 1, ManagementFactory.getThreadMXBean().getThreadCount());
+      assertFalse(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
+
       Assert.assertFalse(bridge.isStarted());
 
       // we restart and setup the server for the test's tearDown checks
@@ -189,6 +190,20 @@
 
    }
    
+   private boolean threadExists(String threadName)
+   {
+      long[] threadIds = ManagementFactory.getThreadMXBean().getAllThreadIds();
+      for (long id : threadIds)
+      {
+         ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(id);
+         if (threadInfo.getThreadName().equals(threadName))
+         {
+            return true;
+         }
+      }
+      return false;
+   }
+   
    /*
     * Send some messages
     * Crash the destination server



More information about the hornetq-commits mailing list