[hornetq-commits] JBoss hornetq SVN: r9141 - in trunk: tests/src/org/hornetq/tests/integration/jms/bridge and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Apr 21 04:14:02 EDT 2010


Author: jmesnil
Date: 2010-04-21 04:14:01 -0400 (Wed, 21 Apr 2010)
New Revision: 9141

Modified:
   trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
   trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-287: cannot stop JMSBridge which is handling startup failure

* use a 3-sized fixed thread pool to manage JMS Bridge concurrent tasks

Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-04-20 20:16:45 UTC (rev 9140)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2010-04-21 08:14:01 UTC (rev 9141)
@@ -19,6 +19,9 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -107,6 +110,8 @@
    private volatile boolean addMessageIDInHeader;
 
    private boolean started;
+   
+   private boolean stopping = false;
 
    private final LinkedList<Message> messages;
 
@@ -136,10 +141,8 @@
 
    private BatchTimeChecker timeChecker;
 
-   private Thread checkerThread;
-
-   private Thread sourceReceiver;
-
+   private ExecutorService executor;
+   
    private long batchExpiryTime;
 
    private boolean paused;
@@ -170,6 +173,7 @@
    public JMSBridgeImpl()
    {
       messages = new LinkedList<Message>();
+      executor = createExecutor();
    }
 
    public JMSBridgeImpl(final ConnectionFactoryFactory sourceCff,
@@ -305,6 +309,8 @@
 
    public synchronized void start() throws Exception
    {
+      stopping = false;
+      
       if (started)
       {
          JMSBridgeImpl.log.warn("Attempt to start, but is already started");
@@ -316,6 +322,12 @@
          JMSBridgeImpl.log.trace("Starting " + this);
       }
 
+      // bridge has been stopped and is restarted
+      if (executor.isShutdown())
+      {
+         executor = createExecutor();
+      }
+      
       checkParams();
 
       TransactionManager tm = getTm();
@@ -356,20 +368,16 @@
 
             timeChecker = new BatchTimeChecker();
 
-            checkerThread = new Thread(timeChecker, "jmsbridge-checker-thread");
-
+            executor.execute(timeChecker);
             batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
 
-            checkerThread.start();
-
             if (JMSBridgeImpl.trace)
             {
                JMSBridgeImpl.log.trace("Started time checker thread");
             }
          }
 
-         sourceReceiver = new SourceReceiver();
-         sourceReceiver.start();
+         executor.execute(new SourceReceiver());
 
          if (JMSBridgeImpl.trace)
          {
@@ -386,12 +394,8 @@
 
    public synchronized void stop() throws Exception
    {
-      if (!started)
-      {
-         JMSBridgeImpl.log.warn("Attempt to stop, but is already stopped");
-         return;
-      }
-
+      stopping = true;
+      
       if (JMSBridgeImpl.trace)
       {
          JMSBridgeImpl.log.trace("Stopping " + this);
@@ -401,50 +405,16 @@
       {
          started = false;
 
-         // This must be inside sync block
-         if (checkerThread != null)
-         {
-            checkerThread.interrupt();
-         }
-
-         if (sourceReceiver != null)
-         {
-            sourceReceiver.interrupt();
-         }
+         executor.shutdownNow();
       }
 
-      // This must be outside sync block
-      if (checkerThread != null)
+      boolean ok = executor.awaitTermination(60, TimeUnit.SECONDS);
+      
+      if(!ok)
       {
-         if (JMSBridgeImpl.trace)
-         {
-            JMSBridgeImpl.log.trace("Waiting for checker thread to finish");
-         }
-
-         checkerThread.join();
-
-         if (JMSBridgeImpl.trace)
-         {
-            JMSBridgeImpl.log.trace("Checker thread has finished");
-         }
+         throw new Exception("fail to stop JMS Bridge");
       }
 
-      // This must be outside sync block
-      if (sourceReceiver != null)
-      {
-         if (JMSBridgeImpl.trace)
-         {
-            JMSBridgeImpl.log.trace("Waiting for source receiver thread to finish");
-         }
-
-         sourceReceiver.join();
-
-         if (JMSBridgeImpl.trace)
-         {
-            JMSBridgeImpl.log.trace("Source receiver thread has finished");
-         }
-      }
-
       if (tx != null)
       {
          // Terminate any transaction
@@ -1366,7 +1336,7 @@
 
       int count = 0;
 
-      while (true)
+      while (true && !stopping)
       {
          boolean ok = setupJMSObjects();
 
@@ -1622,9 +1592,7 @@
       // In the case of onMessage we can't close the connection from inside the onMessage method
       // since it will block waiting for onMessage to complete. In the case of start we want to return
       // from the call before the connections are reestablished so that the caller is not blocked unnecessarily.
-      Thread t = new Thread(failureHandler, "jmsbridge-failurehandler-thread");
-
-      t.start();
+      executor.execute(failureHandler);
    }
 
    private void addMessageIDInHeader(final Message msg) throws Exception
@@ -1702,6 +1670,15 @@
       }
    }
 
+   /**
+    * Creates a 3-sized thred pool executor (1 thread for the sourceReceiver, 1 for the timeChecker
+    * and 1 for the eventual failureHandler)
+    */
+   private ExecutorService createExecutor()
+   {
+      return Executors.newFixedThreadPool(3);
+   }
+
    // Inner classes ---------------------------------------------------------------
 
    /**

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2010-04-20 20:16:45 UTC (rev 9140)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java	2010-04-21 08:14:01 UTC (rev 9141)
@@ -145,7 +145,7 @@
    /**
     * https://jira.jboss.org/jira/browse/HORNETQ-287
     */
-   public void _testStopBridgeWithFailureWhenStarted() throws Exception
+   public void testStopBridgeWithFailureWhenStarted() throws Exception
    {
       jmsServer1.stop();
 
@@ -172,39 +172,17 @@
       Assert.assertFalse(bridge.isStarted());
       Assert.assertTrue(bridge.isFailed());
 
-      assertEquals(1, numOfThreadsStartingWith("pool-"));
-      
       bridge.stop();
       Assert.assertFalse(bridge.isStarted());
 
-      assertEquals(0, numOfThreadsStartingWith("pool-"));
+      // Thread.sleep(3000);
       
       // we restart and setup the server for the test's tearDown checks
       jmsServer1.start();
       createQueue("targetQueue", 1);
       setUpAdministeredObjects();
-
    }
    
-   //TODO is there a better way to check if a thread is still running?
-   private int numOfThreadsStartingWith(String prefix)
-   {
-      int count = 0;
-      long[] threadIds = ManagementFactory.getThreadMXBean().getAllThreadIds();
-      for (long id : threadIds)
-      {
-         ThreadInfo threadInfo = ManagementFactory.getThreadMXBean().getThreadInfo(id);
-         if (threadInfo != null)
-         {
-            if (threadInfo.getThreadName().startsWith(prefix))
-            {
-               count++;
-            }
-         }
-      }
-      return count;
-   }
-   
    /*
     * Send some messages
     * Crash the destination server

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java	2010-04-20 20:16:45 UTC (rev 9140)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java	2010-04-21 08:14:01 UTC (rev 9141)
@@ -562,6 +562,92 @@
       }
    }
 
+   public void testStartStopStart() throws Exception
+   {
+      JMSBridgeImpl bridge = null;
+
+      Connection connSource = null;
+
+      Connection connTarget = null;
+
+      try
+      {
+         final int NUM_MESSAGES = 10;
+
+         bridge = new JMSBridgeImpl(cff0,
+                                    cff1,
+                                    sourceQueueFactory,
+                                    targetQueueFactory,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    null,
+                                    5000,
+                                    10,
+                                    QualityOfServiceMode.AT_MOST_ONCE,
+                                    1,
+                                    -1,
+                                    null,
+                                    null,
+                                    false);
+         bridge.setTransactionManager(newTransactionManager());
+
+         bridge.start();
+
+         bridge.stop();
+         
+         bridge.start();
+         
+         connSource = cf0.createConnection();
+
+         Session sessSend = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sessSend.createProducer(sourceQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sessSend.createTextMessage("message" + i);
+            prod.send(tm);
+         }
+
+         connTarget = cf1.createConnection();
+         Session sessRec = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageConsumer cons = sessRec.createConsumer(targetQueue);
+
+         connTarget.start();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(10000);
+            Assert.assertNotNull(tm);
+            Assert.assertEquals("message" + i, tm.getText());
+         }
+
+         Message m = cons.receiveNoWait();
+         Assert.assertNull(m);
+      }
+      finally
+      {
+         if (connSource != null)
+         {
+            connSource.close();
+         }
+
+         if (connTarget != null)
+         {
+            connTarget.close();
+         }
+
+         if (bridge != null)
+         {
+            bridge.stop();
+         }
+
+         removeAllMessages(sourceQueue.getQueueName(), 0);
+      }
+   }
+   
    public void testSelector() throws Exception
    {
       JMSBridgeImpl bridge = null;



More information about the hornetq-commits mailing list