Author: jmesnil
Date: 2009-09-18 08:17:47 -0400 (Fri, 18 Sep 2009)
New Revision: 7969
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
Log:
HORNETQ-27: Race condition in JMS Bridge between enlisting the XAResource in the TX and
message delivery
* replace the source's MessageListener by polling the source Consumer in a thread to
ensure there is no
race condition between tx resource enlistment and message delivery
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-17 22:02:17 UTC
(rev 7968)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-18 12:17:47 UTC
(rev 7969)
@@ -13,13 +13,12 @@
package org.hornetq.jms.bridge.impl;
+import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -48,7 +47,6 @@
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQSession;
-import org.jboss.tm.TransactionManagerLocator;
/**
*
@@ -278,6 +276,9 @@
if (trace) { log.trace("Started time checker thread"); }
}
+ Thread receiver = new SourceReceiver();
+ receiver.start();
+
if (trace) { log.trace("Started " + this); }
}
else
@@ -285,6 +286,7 @@
log.warn("Failed to start bridge");
handleFailureOnStartup();
}
+
}
public synchronized void stop() throws Exception
@@ -1027,8 +1029,6 @@
}
targetProducer = sess.createProducer(null);
-
- sourceConsumer.setMessageListener(new SourceListener());
return true;
}
@@ -1414,6 +1414,79 @@
// Inner classes ---------------------------------------------------------------
+
+ /**
+ * We use a Thread which polls the sourceDestination instead of a MessageListener
+ * to ensure that message delivery does not happen concurrently with
+ * transaction enlistment of the XAResource (see HORNETQ-27)
+ *
+ */
+ private final class SourceReceiver extends Thread
+ {
+ @Override
+ public void run()
+ {
+ while(isStarted())
+ {
+ synchronized (lock)
+ {
+ if (isPaused() || failed)
+ {
+ try
+ {
+ lock.wait(500);
+ }
+ catch (InterruptedException e)
+ {
+ if (trace) { log.trace(this + " thread was interrupted");
}
+ }
+ continue;
+ }
+
+ Message msg = null;
+ try
+ {
+ msg = sourceConsumer.receive(1000);
+ }
+ catch (JMSException jmse)
+ {
+ if (trace) { log.trace(this + " exception while receiving a
message", jmse); }
+ }
+
+ if (msg == null)
+ {
+ try
+ {
+ lock.wait(500);
+ }
+ catch (InterruptedException e)
+ {
+ if (trace) { log.trace(this + " thread was interrupted");
}
+ }
+ continue;
+ }
+
+ if (trace) { log.trace(this + " received message " + msg); }
+
+ messages.add(msg);
+
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+
+ if (trace) { log.trace(this + " rescheduled batchExpiryTime to "
+ batchExpiryTime); }
+
+ if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
+ {
+ if (trace) { log.trace(this + " maxBatchSize has been reached so
sending batch"); }
+
+ sendBatch();
+
+ if (trace) { log.trace(this + " sent batch"); }
+ }
+ }
+ }
+ }
+ }
+
private class FailureHandler implements Runnable
{
/**
@@ -1575,40 +1648,6 @@
}
}
- private class SourceListener implements MessageListener
- {
- public void onMessage(Message msg)
- {
- synchronized (lock)
- {
- if (failed)
- {
- //Ignore the message
- if (trace) { log.trace("JMSBridge has failed so ignoring
message"); }
-
- return;
- }
-
- if (trace) { log.trace(this + " received message " + msg); }
-
- messages.add(msg);
-
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-
- if (trace) { log.trace(this + " rescheduled batchExpiryTime to " +
batchExpiryTime); }
-
- if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
- {
- if (trace) { log.trace(this + " maxBatchSize has been reached so
sending batch"); }
-
- sendBatch();
-
- if (trace) { log.trace(this + " sent batch"); }
- }
- }
- }
- }
-
private class BridgeExceptionListener implements ExceptionListener
{
public void onException(JMSException e)
Show replies by date