[hornetq-commits] JBoss hornetq SVN: r7969 - trunk/src/main/org/hornetq/jms/bridge/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Sep 18 08:17:48 EDT 2009
Author: jmesnil
Date: 2009-09-18 08:17:47 -0400 (Fri, 18 Sep 2009)
New Revision: 7969
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
Log:
HORNETQ-27: Race condition in JMS Bridge between enlisting the XAResource in the TX and message delivery
* replace the source's MessageListener by polling the source Consumer in a thread to ensure there is no
race condition between tx resource enlistment and message delivery
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-17 22:02:17 UTC (rev 7968)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-18 12:17:47 UTC (rev 7969)
@@ -13,13 +13,12 @@
package org.hornetq.jms.bridge.impl;
+import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -48,7 +47,6 @@
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQSession;
-import org.jboss.tm.TransactionManagerLocator;
/**
*
@@ -278,6 +276,9 @@
if (trace) { log.trace("Started time checker thread"); }
}
+ Thread receiver = new SourceReceiver();
+ receiver.start();
+
if (trace) { log.trace("Started " + this); }
}
else
@@ -285,6 +286,7 @@
log.warn("Failed to start bridge");
handleFailureOnStartup();
}
+
}
public synchronized void stop() throws Exception
@@ -1027,8 +1029,6 @@
}
targetProducer = sess.createProducer(null);
-
- sourceConsumer.setMessageListener(new SourceListener());
return true;
}
@@ -1414,6 +1414,79 @@
// Inner classes ---------------------------------------------------------------
+
+ /**
+ * We use a Thread which polls the sourceDestination instead of a MessageListener
+ * to ensure that message delivery does not happen concurrently with
+ * transaction enlistment of the XAResource (see HORNETQ-27)
+ *
+ */
+ private final class SourceReceiver extends Thread
+ {
+ @Override
+ public void run()
+ {
+ while(isStarted())
+ {
+ synchronized (lock)
+ {
+ if (isPaused() || failed)
+ {
+ try
+ {
+ lock.wait(500);
+ }
+ catch (InterruptedException e)
+ {
+ if (trace) { log.trace(this + " thread was interrupted"); }
+ }
+ continue;
+ }
+
+ Message msg = null;
+ try
+ {
+ msg = sourceConsumer.receive(1000);
+ }
+ catch (JMSException jmse)
+ {
+ if (trace) { log.trace(this + " exception while receiving a message", jmse); }
+ }
+
+ if (msg == null)
+ {
+ try
+ {
+ lock.wait(500);
+ }
+ catch (InterruptedException e)
+ {
+ if (trace) { log.trace(this + " thread was interrupted"); }
+ }
+ continue;
+ }
+
+ if (trace) { log.trace(this + " received message " + msg); }
+
+ messages.add(msg);
+
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+
+ if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
+
+ if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
+ {
+ if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
+
+ sendBatch();
+
+ if (trace) { log.trace(this + " sent batch"); }
+ }
+ }
+ }
+ }
+ }
+
private class FailureHandler implements Runnable
{
/**
@@ -1575,40 +1648,6 @@
}
}
- private class SourceListener implements MessageListener
- {
- public void onMessage(Message msg)
- {
- synchronized (lock)
- {
- if (failed)
- {
- //Ignore the message
- if (trace) { log.trace("JMSBridge has failed so ignoring message"); }
-
- return;
- }
-
- if (trace) { log.trace(this + " received message " + msg); }
-
- messages.add(msg);
-
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-
- if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
-
- if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
- {
- if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
-
- sendBatch();
-
- if (trace) { log.trace(this + " sent batch"); }
- }
- }
- }
- }
-
private class BridgeExceptionListener implements ExceptionListener
{
public void onException(JMSException e)
More information about the hornetq-commits
mailing list