[hornetq-commits] JBoss hornetq SVN: r9141 - in trunk: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Apr 21 04:14:02 EDT 2010
Author: jmesnil
Date: 2010-04-21 04:14:01 -0400 (Wed, 21 Apr 2010)
New Revision: 9141
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-287: cannot stop JMSBridge which is handling startup failure
* use a 3-sized fixed thread pool to manage JMS Bridge concurrent tasks
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-20 20:16:45 UTC (rev 9140)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2010-04-21 08:14:01 UTC (rev 9141)
@@ -19,6 +19,9 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -107,6 +110,8 @@
private volatile boolean addMessageIDInHeader;
private boolean started;
+
+ private boolean stopping = false;
private final LinkedList<Message> messages;
@@ -136,10 +141,8 @@
private BatchTimeChecker timeChecker;
- private Thread checkerThread;
-
- private Thread sourceReceiver;
-
+ private ExecutorService executor;
+
private long batchExpiryTime;
private boolean paused;
@@ -170,6 +173,7 @@
public JMSBridgeImpl()
{
messages = new LinkedList<Message>();
+ executor = createExecutor();
}
public JMSBridgeImpl(final ConnectionFactoryFactory sourceCff,
@@ -305,6 +309,8 @@
public synchronized void start() throws Exception
{
+ stopping = false;
+
if (started)
{
JMSBridgeImpl.log.warn("Attempt to start, but is already started");
@@ -316,6 +322,12 @@
JMSBridgeImpl.log.trace("Starting " + this);
}
+ // bridge has been stopped and is restarted
+ if (executor.isShutdown())
+ {
+ executor = createExecutor();
+ }
+
checkParams();
TransactionManager tm = getTm();
@@ -356,20 +368,16 @@
timeChecker = new BatchTimeChecker();
- checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
-
+ executor.execute(timeChecker);
batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
- checkerThread.start();
-
if (JMSBridgeImpl.trace)
{
JMSBridgeImpl.log.trace("Started time checker thread");
}
}
- sourceReceiver = new SourceReceiver();
- sourceReceiver.start();
+ executor.execute(new SourceReceiver());
if (JMSBridgeImpl.trace)
{
@@ -386,12 +394,8 @@
public synchronized void stop() throws Exception
{
- if (!started)
- {
- JMSBridgeImpl.log.warn("Attempt to stop, but is already stopped");
- return;
- }
-
+ stopping = true;
+
if (JMSBridgeImpl.trace)
{
JMSBridgeImpl.log.trace("Stopping " + this);
@@ -401,50 +405,16 @@
{
started = false;
- // This must be inside sync block
- if (checkerThread != null)
- {
- checkerThread.interrupt();
- }
-
- if (sourceReceiver != null)
- {
- sourceReceiver.interrupt();
- }
+ executor.shutdownNow();
}
- // This must be outside sync block
- if (checkerThread != null)
+ boolean ok = executor.awaitTermination(60, TimeUnit.SECONDS);
+
+ if(!ok)
{
- if (JMSBridgeImpl.trace)
- {
- JMSBridgeImpl.log.trace("Waiting for checker thread to finish");
- }
-
- checkerThread.join();
-
- if (JMSBridgeImpl.trace)
- {
- JMSBridgeImpl.log.trace("Checker thread has finished");
- }
+ throw new Exception("fail to stop JMS Bridge");
}
- // This must be outside sync block
- if (sourceReceiver != null)
- {
- if (JMSBridgeImpl.trace)
- {
- JMSBridgeImpl.log.trace("Waiting for source receiver thread to finish");
- }
-
- sourceReceiver.join();
-
- if (JMSBridgeImpl.trace)
- {
- JMSBridgeImpl.log.trace("Source receiver thread has finished");
- }
- }
-
if (tx != null)
{
// Terminate any transaction
@@ -1366,7 +1336,7 @@
int count = 0;
- while (true)
+ while (true && !stopping)
{
boolean ok = setupJMSObjects();
@@ -1622,9 +1592,7 @@
// In the case of onMessage we can't close the connection from inside the onMessage method
// since it will block waiting for onMessage to complete. In the case of start we want to return
// from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
- Thread t = new Thread(failureHandler, "jmsbridge-failurehandler-thread");
-
- t.start();
+ executor.execute(failureHandler);
}
private void addMessageIDInHeader(final Message msg) throws Exception
@@ -1702,6 +1670,15 @@
}
}
+ /**
+ * Creates a 3-sized thred pool executor (1 thread for the sourceReceiver, 1 for the timeChecker
+ * and 1 for the eventual failureHandler)
+ */
+ private ExecutorService createExecutor()
+ {
+ return Executors.newFixedThreadPool(3);
+ }
+
// Inner classes ---------------------------------------------------------------
/**
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-20 20:16:45 UTC (rev 9140)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2010-04-21 08:14:01 UTC (rev 9141)
@@ -145,7 +145,7 @@
/**
* https://jira.jboss.org/jira/browse/HORNETQ-287
*/
- public void _testStopBridgeWithFailureWhenStarted() throws Exception
+ public void testStopBridgeWithFailureWhenStarted() throws Exception
{
jmsServer1.stop();
@@ -172,39 +172,17 @@
Assert.assertFalse(bridge.isStarted());
Assert.assertTrue(bridge.isFailed());
- assertEquals(1, numOfThreadsStartingWith("pool-"));
-
bridge.stop();
Assert.assertFalse(bridge.isStarted());
- assertEquals(0, numOfThreadsStartingWith("pool-"));
+ // Thread.sleep(3000);
// we restart and setup the server for the test's tearDown checks
jmsServer1.start();
createQueue("targetQueue", 1);
setUpAdministeredObjects();
-
}
- //TODO is there a better way to check if a thread is still running?
- private int numOfThreadsStartingWith(String prefix)
- {
- int count = 0;
- long[] threadIds = ManagementFactory.getThreadMXBean().getAllThreadIds();
- for (long id : threadIds)
- {
- ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(id);
- if (threadInfo != null)
- {
- if (threadInfo.getThreadName().startsWith(prefix))
- {
- count++;
- }
- }
- }
- return count;
- }
-
/*
* Send some messages
* Crash the destination server
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-04-20 20:16:45 UTC (rev 9140)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2010-04-21 08:14:01 UTC (rev 9141)
@@ -562,6 +562,92 @@
}
}
+ public void testStartStopStart() throws Exception
+ {
+ JMSBridgeImpl bridge = null;
+
+ Connection connSource = null;
+
+ Connection connTarget = null;
+
+ try
+ {
+ final int NUM_MESSAGES = 10;
+
+ bridge = new JMSBridgeImpl(cff0,
+ cff1,
+ sourceQueueFactory,
+ targetQueueFactory,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 5000,
+ 10,
+ QualityOfServiceMode.AT_MOST_ONCE,
+ 1,
+ -1,
+ null,
+ null,
+ false);
+ bridge.setTransactionManager(newTransactionManager());
+
+ bridge.start();
+
+ bridge.stop();
+
+ bridge.start();
+
+ connSource = cf0.createConnection();
+
+ Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+ prod.send(tm);
+ }
+
+ connTarget = cf1.createConnection();
+ Session sessRec = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer cons = sessRec.createConsumer(targetQueue);
+
+ connTarget.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(10000);
+ Assert.assertNotNull(tm);
+ Assert.assertEquals("message" + i, tm.getText());
+ }
+
+ Message m = cons.receiveNoWait();
+ Assert.assertNull(m);
+ }
+ finally
+ {
+ if (connSource != null)
+ {
+ connSource.close();
+ }
+
+ if (connTarget != null)
+ {
+ connTarget.close();
+ }
+
+ if (bridge != null)
+ {
+ bridge.stop();
+ }
+
+ removeAllMessages(sourceQueue.getQueueName(), 0);
+ }
+ }
+
public void testSelector() throws Exception
{
JMSBridgeImpl bridge = null;
More information about the hornetq-commits
mailing list