[hornetq-commits] JBoss hornetq SVN: r9598 - in trunk/src/main/org/hornetq: ra/inflow and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Aug 26 13:48:38 EDT 2010
Author: timfox
Date: 2010-08-26 13:48:38 -0400 (Thu, 26 Aug 2010)
New Revision: 9598
Modified:
trunk/src/main/org/hornetq/jms/client/HornetQSession.java
trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/browse/HORNETQ-495
Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java 2010-08-26 17:48:38 UTC (rev 9598)
@@ -213,6 +213,11 @@
return ackMode;
}
+
+ public boolean isXA()
+ {
+ return xa;
+ }
public void commit() throws JMSException
{
Modified: trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java 2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/jms/client/JMSMessageListenerWrapper.java 2010-08-26 17:48:38 UTC (rev 9598)
@@ -42,9 +42,9 @@
private final boolean transactedOrClientAck;
protected JMSMessageListenerWrapper(final HornetQSession session,
- final ClientConsumer consumer,
- final MessageListener listener,
- final int ackMode)
+ final ClientConsumer consumer,
+ final MessageListener listener,
+ final int ackMode)
{
this.session = session;
@@ -52,7 +52,7 @@
this.listener = listener;
- transactedOrClientAck = ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE;
+ transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
}
/**
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-08-25 15:34:03 UTC (rev 9597)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-08-26 17:48:38 UTC (rev 9598)
@@ -68,11 +68,16 @@
private boolean useLocalTx;
+ private boolean transacted;
+
private final int sessionNr;
private final TransactionManager tm;
- public HornetQMessageHandler(final HornetQActivation activation, final TransactionManager tm, final ClientSession session, final int sessionNr)
+ public HornetQMessageHandler(final HornetQActivation activation,
+ final TransactionManager tm,
+ final ClientSession session,
+ final int sessionNr)
{
this.activation = activation;
this.session = session;
@@ -96,14 +101,16 @@
{
String subscriptionName = spec.getSubscriptionName();
String clientID = spec.getClientID();
-
+
// Durable sub
if (clientID == null)
{
- throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName + " - client ID has not been set");
+ throw new InvalidClientIDException("Cannot create durable subscription for " + subscriptionName +
+ " - client ID has not been set");
}
- SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID, subscriptionName));
+ SimpleString queueName = new SimpleString(HornetQDestination.createQueueNameForDurableSubscription(clientID,
+ subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName);
@@ -123,9 +130,10 @@
SimpleString oldFilterString = subResponse.getFilterString();
boolean selectorChanged = selector == null && oldFilterString != null ||
- oldFilterString == null && selector != null ||
- (oldFilterString != null && selector != null &&
- !oldFilterString.toString().equals(selector));
+ oldFilterString == null &&
+ selector != null ||
+ (oldFilterString != null && selector != null && !oldFilterString.toString()
+ .equals(selector));
SimpleString oldTopicName = subResponse.getAddress();
@@ -155,7 +163,7 @@
}
else
{
- queueName = activation.getTopicTemporaryQueue();
+ queueName = activation.getTopicTemporaryQueue();
}
}
else
@@ -168,6 +176,7 @@
// Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
+ transacted = activation.isDeliveryTransacted();
if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
{
endpoint = endpointFactory.createEndpoint(session);
@@ -201,7 +210,7 @@
{
HornetQMessageHandler.log.debug("Error releasing endpoint " + endpoint, t);
}
-
+
try
{
consumer.close();
@@ -246,15 +255,28 @@
try
{
- if(activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
+ if (activation.getActivationSpec().getTransactionTimeout() > 0 && tm != null)
{
tm.setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
}
endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
beforeDelivery = true;
msg.doBeforeReceive();
+
+ //In the transacted case the message must be acked *before* onMessage is called
+
+ if (transacted)
+ {
+ message.acknowledge();
+ }
+
((MessageListener)endpoint).onMessage(msg);
- message.acknowledge();
+
+ if (!transacted)
+ {
+ message.acknowledge();
+ }
+
try
{
endpoint.afterDelivery();
More information about the hornetq-commits
mailing list