[hornetq-commits] JBoss hornetq SVN: r7969 - trunk/src/main/org/hornetq/jms/bridge/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 18 08:17:48 EDT 2009


Author: jmesnil
Date: 2009-09-18 08:17:47 -0400 (Fri, 18 Sep 2009)
New Revision: 7969

Modified:
   trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
Log:
HORNETQ-27: Race condition in JMS Bridge between enlisting the XAResource in the TX and message delivery

* replace the source's MessageListener by polling the source Consumer in a thread to ensure there is no 
  race condition between tx resource enlistment and message delivery

Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2009-09-17 22:02:17 UTC (rev 7968)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java	2009-09-18 12:17:47 UTC (rev 7969)
@@ -13,13 +13,12 @@
 
 package org.hornetq.jms.bridge.impl;
 
+import java.lang.reflect.Method;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -48,7 +47,6 @@
 import org.hornetq.jms.bridge.QualityOfServiceMode;
 import org.hornetq.jms.client.HornetQMessage;
 import org.hornetq.jms.client.HornetQSession;
-import org.jboss.tm.TransactionManagerLocator;
 
 /**
  * 
@@ -278,6 +276,9 @@
             if (trace) { log.trace("Started time checker thread"); }
          }            
          
+         Thread receiver = new SourceReceiver();
+         receiver.start();
+         
          if (trace) { log.trace("Started " + this); }
       }
       else
@@ -285,6 +286,7 @@
          log.warn("Failed to start bridge");
          handleFailureOnStartup();
       }
+      
    }
    
    public synchronized void stop() throws Exception
@@ -1027,8 +1029,6 @@
          }
          
          targetProducer = sess.createProducer(null);
-                          
-         sourceConsumer.setMessageListener(new SourceListener());
          
          return true;
       }
@@ -1414,6 +1414,79 @@
    
    // Inner classes ---------------------------------------------------------------
    
+
+   /**
+    * We use a Thread which polls the sourceDestination instead of a MessageListener
+    * to ensure that message delivery does not happen concurrently with
+    * transaction enlistment of the XAResource (see HORNETQ-27)
+    *
+    */
+   private final class SourceReceiver extends Thread
+   {
+      @Override
+      public void run()
+      {
+         while(isStarted())
+         {
+            synchronized (lock)
+            {
+               if (isPaused() || failed)
+               {
+                  try
+                  {
+                     lock.wait(500);
+                  }
+                  catch (InterruptedException e)
+                  {
+                     if (trace) { log.trace(this + " thread was interrupted"); }
+                  }
+                  continue;
+               }
+
+               Message msg = null;
+               try
+               {
+                  msg = sourceConsumer.receive(1000);
+               }
+               catch (JMSException jmse)
+               {
+                  if (trace) { log.trace(this + " exception while receiving a message", jmse); }
+               }
+
+               if (msg == null)
+               {
+                  try
+                  {
+                     lock.wait(500);
+                  }
+                  catch (InterruptedException e)
+                  {
+                     if (trace) { log.trace(this + " thread was interrupted"); }
+                  }
+                  continue;
+               }
+
+               if (trace) { log.trace(this + " received message " + msg); }
+
+               messages.add(msg);
+
+               batchExpiryTime = System.currentTimeMillis() + maxBatchTime;            
+
+               if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
+
+               if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
+               {
+                  if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
+
+                  sendBatch();
+
+                  if (trace) { log.trace(this + " sent batch"); }
+               }                        
+            }
+         }         
+      }
+   }
+
    private class FailureHandler implements Runnable
    {
       /**
@@ -1575,40 +1648,6 @@
       }      
    }  
    
-   private class SourceListener implements MessageListener
-   {
-      public void onMessage(Message msg)
-      {
-         synchronized (lock)
-         {
-            if (failed)
-            {
-               //Ignore the message
-               if (trace) { log.trace("JMSBridge has failed so ignoring message"); }
-                              
-               return;
-            }
-            
-            if (trace) { log.trace(this + " received message " + msg); }
-            
-            messages.add(msg);
-            
-            batchExpiryTime = System.currentTimeMillis() + maxBatchTime;            
-            
-            if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
-            
-            if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
-            {
-               if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
-               
-               sendBatch();
-               
-               if (trace) { log.trace(this + " sent batch"); }
-            }                        
-         }
-      }      
-   }   
-   
    private class BridgeExceptionListener implements ExceptionListener
    {
 		public void onException(JMSException e)



More information about the hornetq-commits mailing list