[hornetq-commits] JBoss hornetq SVN: r9123 - in trunk: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Apr 15 08:35:23 EDT 2010
Author: jmesnil
Date: 2010-04-15 08:35:22 -0400 (Thu, 15 Apr 2010)
New Revision: 9123
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-287: cannot stop JMSBridge which is handling startup failure
* use an executor to run the failure handler thread
* remove 10K-iteration test() from JMSBridgeReconnectionTest
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-15 10:38:36 UTC (rev 9122)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-15 12:35:22 UTC (rev 9123)
@@ -19,6 +19,9 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -50,6 +53,7 @@
import org.hornetq.jms.bridge.JMSBridgeControl;
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.utils.Future;
/**
*
@@ -63,8 +67,6 @@
*/
public class JMSBridgeImpl implements HornetQComponent, JMSBridge
{
- public static final String FAILURE_HANDLER_THREAD_NAME = "jmsbridge-failurehandler-thread";
-
private static final Logger log;
private static boolean trace;
@@ -162,14 +164,14 @@
private ObjectName objectName;
- private Thread startupFailureThread;
-
private static final int FORWARD_MODE_XA = 0;
private static final int FORWARD_MODE_LOCALTX = 1;
private static final int FORWARD_MODE_NONTX = 2;
+ private ExecutorService executor;
+
/*
* Constructor for MBean
*/
@@ -276,6 +278,8 @@
checkParams();
+ this.executor = Executors.newSingleThreadExecutor();
+
if (mbeanServer != null)
{
if (objectName != null)
@@ -364,20 +368,19 @@
timeChecker = new BatchTimeChecker();
- checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
-
+ Thread checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
+ checkerThread.start();
+
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
- checkerThread.start();
-
if (JMSBridgeImpl.trace)
{
JMSBridgeImpl.log.trace("Started time checker thread");
}
}
- sourceReceiver = new SourceReceiver();
- sourceReceiver.start();
+ Thread sourceThread = new SourceReceiver();
+ sourceThread.start();
if (JMSBridgeImpl.trace)
{
@@ -415,12 +418,14 @@
{
sourceReceiver.interrupt();
}
+
+ executor.shutdown();
+ boolean ok = executor.awaitTermination(2 * failureRetryInterval, TimeUnit.MILLISECONDS);
- if (startupFailureThread != null)
+ if (!ok)
{
- startupFailureThread.interrupt();
+ log.warn("Timed out waiting to stop");
}
-
}
// This must be outside sync block
@@ -454,22 +459,6 @@
JMSBridgeImpl.log.trace("Source receiver thread has finished");
}
}
-
- // This must be outside sync block
- if (startupFailureThread != null)
- {
- if (JMSBridgeImpl.trace)
- {
- JMSBridgeImpl.log.trace("Waiting for failure thread to finish");
- }
-
- startupFailureThread.join();
-
- if (JMSBridgeImpl.trace)
- {
- JMSBridgeImpl.log.trace("Failure thread has finished");
- }
- }
if (tx != null)
{
@@ -1637,10 +1626,10 @@
private void handleFailureOnStartup()
{
- startupFailureThread = handleFailure(new StartupFailureHandler());
+ handleFailure(new StartupFailureHandler());
}
- private Thread handleFailure(final Runnable failureHandler)
+ private void handleFailure(final Runnable failureHandler)
{
failed = true;
@@ -1648,11 +1637,7 @@
// In the case of onMessage we can't close the connection from inside the onMessage method
// since it will block waiting for onMessage to complete. In the case of start we want to return
// from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
- Thread t = new Thread(failureHandler, FAILURE_HANDLER_THREAD_NAME);
-
- t.start();
-
- return t;
+ executor.execute(new Thread(failureHandler, "jmsbridge-thread-pool"));
}
private void addMessageIDInHeader(final Message msg) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-15 10:38:36 UTC (rev 9122)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-15 12:35:22 UTC (rev 9123)
@@ -12,11 +12,8 @@
*/
package org.hornetq.tests.integration.jms.bridge;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
-import java.util.Map;
import junit.framework.Assert;
@@ -145,19 +142,6 @@
}
}
- public void test() throws Exception
- {
- for (int i = 0; i < 1000000; i++)
- {
- log.info("** ITER "+ i);
-
- this.testStopBridgeWithFailureWhenStarted();
- tearDown();
-
- setUp();
- }
- }
-
/**
* https://jira.jboss.org/jira/browse/HORNETQ-287
*/
@@ -188,14 +172,13 @@
Assert.assertFalse(bridge.isStarted());
Assert.assertTrue(bridge.isFailed());
- assertTrue(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
+ assertEquals(1, numOfThreadsStartingWith("pool-"));
bridge.stop();
-
- assertFalse(threadExists(JMSBridgeImpl.FAILURE_HANDLER_THREAD_NAME));
-
Assert.assertFalse(bridge.isStarted());
+ assertEquals(0, numOfThreadsStartingWith("pool-"));
+
// we restart and setup the server for the test's tearDown checks
jmsServer1.start();
createQueue("targetQueue", 1);
@@ -203,18 +186,23 @@
}
- private boolean threadExists(String threadName)
+ //TODO is there a better way to check if a thread is still running?
+ private int numOfThreadsStartingWith(String prefix)
{
+ int count = 0;
long[] threadIds = ManagementFactory.getThreadMXBean().getAllThreadIds();
for (long id : threadIds)
{
ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(id);
- if (threadInfo.getThreadName().equals(threadName))
+ if (threadInfo != null)
{
- return true;
+ if (threadInfo.getThreadName().startsWith(prefix))
+ {
+ count++;
+ }
}
}
- return false;
+ return count;
}
/*
More information about the hornetq-commits
mailing list