Author: jmesnil
Date: 2010-04-15 05:10:04 -0400 (Thu, 15 Apr 2010)
New Revision: 9119
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-287: cannot stop JMSBridge which is handling
startup failure
* flag JMSBridge stopping attribute as volatile
* make sure the failure handler is stopped before JMSBridge.stop() returns
* fix test to check for the failure handler thread existence before/after calling
JMSBrige.stop()
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-14 18:35:54 UTC
(rev 9118)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-15 09:10:04 UTC
(rev 9119)
@@ -63,6 +63,8 @@
*/
public class JMSBridgeImpl implements HornetQComponent, JMSBridge
{
+ public static final String FAILURE_HANDLER_THREAD_NAME =
"jmsbridge-failurehandler-thread";
+
private static final Logger log;
private static boolean trace;
@@ -108,7 +110,7 @@
private boolean started;
- private boolean stopping = false;
+ private volatile boolean stopping = false;
private final LinkedList<Message> messages;
@@ -160,6 +162,8 @@
private ObjectName objectName;
+ private Thread startupFailureThread;
+
private static final int FORWARD_MODE_XA = 0;
private static final int FORWARD_MODE_LOCALTX = 1;
@@ -392,12 +396,6 @@
{
stopping = true;
- if (!started)
- {
- JMSBridgeImpl.log.warn("Attempt to stop, but is already stopped");
- return;
- }
-
if (JMSBridgeImpl.trace)
{
JMSBridgeImpl.log.trace("Stopping " + this);
@@ -417,6 +415,12 @@
{
sourceReceiver.interrupt();
}
+
+ if (startupFailureThread != null)
+ {
+ startupFailureThread.interrupt();
+ }
+
}
// This must be outside sync block
@@ -451,6 +455,22 @@
}
}
+ // This must be outside sync block
+ if (startupFailureThread != null)
+ {
+ if (JMSBridgeImpl.trace)
+ {
+ JMSBridgeImpl.log.trace("Waiting for failure thread to finish");
+ }
+
+ startupFailureThread.join();
+
+ if (JMSBridgeImpl.trace)
+ {
+ JMSBridgeImpl.log.trace("Failure thread has finished");
+ }
+ }
+
if (tx != null)
{
// Terminate any transaction
@@ -1617,10 +1637,10 @@
private void handleFailureOnStartup()
{
- handleFailure(new StartupFailureHandler());
+ startupFailureThread = handleFailure(new StartupFailureHandler());
}
- private void handleFailure(final Runnable failureHandler)
+ private Thread handleFailure(final Runnable failureHandler)
{
failed = true;
@@ -1628,9 +1648,11 @@
// In the case of onMessage we can't close the connection from inside the
onMessage method
// since it will block waiting for onMessage to complete. In the case of start we
want to return
// from the call before the connections are reestablished so that the caller is not
blocked unnecessarily.
- Thread t = new Thread(failureHandler,
"jmsbridge-failurehandler-thread");
+ Thread t = new Thread(failureHandler, FAILURE_HANDLER_THREAD_NAME);
t.start();
+
+ return t;
}
private void addMessageIDInHeader(final Message msg) throws Exception
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-14
18:35:54 UTC (rev 9118)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-15
09:10:04 UTC (rev 9119)
@@ -12,7 +12,11 @@
*/
package org.hornetq.tests.integration.jms.bridge;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.util.Map;
import junit.framework.Assert;
@@ -148,8 +152,6 @@
{
jmsServer1.stop();
- long failureRetryInterval = 500;
-
JMSBridgeImpl bridge = new JMSBridgeImpl(cff0,
cff1,
sourceQueueFactory,
@@ -173,13 +175,12 @@
Assert.assertFalse(bridge.isStarted());
Assert.assertTrue(bridge.isFailed());
- int numThreads = ManagementFactory.getThreadMXBean().getThreadCount();
+ assertTrue(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
bridge.stop();
- Thread.sleep(failureRetryInterval * 2);
- // the JMS Brigde failure handler thread must have been stopped at most 1
failureRetryInterval ms after the bridge is stopped
- assertEquals(numThreads - 1,
ManagementFactory.getThreadMXBean().getThreadCount());
+ assertFalse(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
+
Assert.assertFalse(bridge.isStarted());
// we restart and setup the server for the test's tearDown checks
@@ -189,6 +190,20 @@
}
+ private boolean threadExists(String threadName)
+ {
+ long[] threadIds = ManagementFactory.getThreadMXBean().getAllThreadIds();
+ for (long id : threadIds)
+ {
+ ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(id);
+ if (threadInfo.getThreadName().equals(threadName))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/*
* Send some messages
* Crash the destination server