Author: clebert.suconic(a)jboss.com
Date: 2011-03-25 04:23:37 -0400 (Fri, 25 Mar 2011)
New Revision: 10362
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153 - avoiding message loss on shutdown
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2011-03-24
18:09:04 UTC (rev 10361)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2011-03-25
08:23:37 UTC (rev 10362)
@@ -134,7 +134,7 @@
@Override
public String toString()
{
- return "ClientMessage[messageID=" + messageID + ", durable=" +
durable + ", address=" + getAddress() + "]";
+ return "ClientMessage[messageID=" + messageID + ", durable=" +
durable + ", address=" + getAddress() + ",properties=" +
properties.toString() + "]";
}
/* (non-Javadoc)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-24
18:09:04 UTC (rev 10361)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-25
08:23:37 UTC (rev 10362)
@@ -635,7 +635,7 @@
public void resetIfNeeded() throws HornetQException
{
- if(rollbackOnly)
+ if (rollbackOnly)
{
log.warn("resetting session after failure");
rollback(false);
@@ -1204,7 +1204,7 @@
{
checkXA();
- //we should never throw rollback if we have already prepared
+ // we should never throw rollback if we have already prepared
if (rollbackOnly)
{
log.warn("committing transaction after failover occurred, any non
persistent messages may be lost");
@@ -1223,8 +1223,8 @@
if (response.isError())
{
- //if we retry and its not there the assume that it was committed
- if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+ // if we retry and its not there the assume that it was committed
+ if (xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
{
return;
}
@@ -1395,7 +1395,7 @@
}
catch (HornetQException e1)
{
- //ignore and rollback
+ // ignore and rollback
}
log.warn("failover occurred during prepare rolling back");
try
@@ -1481,8 +1481,8 @@
if (response.isError())
{
- //if we retry and its not there the assume that it was rolled back
- if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+ // if we retry and its not there the assume that it was rolled back
+ if (xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
{
return;
}
@@ -1557,7 +1557,7 @@
}
catch (HornetQException e)
{
- //we can retry this only because we know for sure that no work would have been
done
+ // we can retry this only because we know for sure that no work would have been
done
if (e.getCode() == HornetQException.UNBLOCKED)
{
try
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-24
18:09:04 UTC (rev 10361)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-25
08:23:37 UTC (rev 10362)
@@ -1062,10 +1062,7 @@
{
StringBuffer warnMessage = new StringBuffer();
warnMessage.append("Duplicate message detected through the bridge -
message will not be routed. Message information:\n");
- for (SimpleString key : message.getPropertyNames())
- {
- warnMessage.append(key + "=" + message.getObjectProperty(key) +
"\n");
- }
+ warnMessage.append(message.toString());
PostOfficeImpl.log.warn(warnMessage.toString());
if (context.getTransaction() != null)
@@ -1105,10 +1102,7 @@
{
StringBuffer warnMessage = new StringBuffer();
warnMessage.append("Duplicate message detected - message will not be
routed. Message information:\n");
- for (SimpleString key : message.getPropertyNames())
- {
- warnMessage.append(key + "=" + message.getObjectProperty(key) +
"\n");
- }
+ warnMessage.append(message.toString());
PostOfficeImpl.log.warn(warnMessage.toString());
if (context.getTransaction() != null)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-24
18:09:04 UTC (rev 10361)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25
08:23:37 UTC (rev 10362)
@@ -642,6 +642,16 @@
}
}
+
+ // We close all the exception in an attempt to let any pending IO to finish
+ // to avoid scenarios where the send or ACK got to disk but the response didn't
get to the client
+ // It may still be possible to have this scenario on a real failure (without the
use of XA)
+ // But at least we will do our best to avoid it on regular shutdowns
+ for (ServerSession session : sessions.values())
+ {
+ log.info("closing a session" );
+ session.close(true);
+ }
remotingService.stop();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-24
18:09:04 UTC (rev 10361)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-25
08:23:37 UTC (rev 10362)
@@ -175,7 +175,7 @@
{
return "Reference[" + getMessage().getMessageID() +
"]:" +
- (getMessage().isDurable() ? "RELIABLE" :
"NON-RELIABLE");
+ (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE")
+ ":" + getMessage() ;
}
// Package protected ---------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-24
18:09:04 UTC (rev 10361)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2011-03-25
08:23:37 UTC (rev 10362)
@@ -273,7 +273,7 @@
@Override
public String toString()
{
- return "ServerMessage[messageID=" + messageID + ", durable=" +
durable + ", address=" + getAddress() + "]";
+ return "ServerMessage[messageID=" + messageID + ", durable=" +
durable + ", address=" + getAddress() + ",properties=" +
properties.toString() + "]";
}
// FIXME - this is stuff that is only used in large messages
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2011-03-24
18:09:04 UTC (rev 10361)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2011-03-25
08:23:37 UTC (rev 10362)
@@ -19,15 +19,15 @@
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.api.core.client.MessageHandler;
-import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQDestination;
@@ -70,6 +70,8 @@
private boolean useLocalTx;
private boolean transacted;
+
+ private boolean useXA = false;
private final int sessionNr;
@@ -181,10 +183,12 @@
if (activation.isDeliveryTransacted() &&
!activation.getActivationSpec().isUseLocalTx())
{
endpoint = endpointFactory.createEndpoint(session);
+ useXA = true;
}
else
{
endpoint = endpointFactory.createEndpoint(null);
+ useXA = false;
}
consumer.setMessageHandler(this);
}
@@ -246,6 +250,8 @@
public void onMessage(final ClientMessage message)
{
+ HornetQMessageHandler.log.info("onMessage(" + message + ")");
+
if (HornetQMessageHandler.trace)
{
HornetQMessageHandler.log.trace("onMessage(" + message +
")");
@@ -298,6 +304,35 @@
// we need to call before/afterDelivery as a pair
if (beforeDelivery)
{
+ if (useXA && tm != null)
+ {
+ // This is the job for the container,
+ // however if the container throws an exception because of some other
errors,
+ // there are situations where the container is not setting the rollback
only
+ // this is to avoid a scenario where afterDelivery would kick in
+ try
+ {
+ Transaction tx = tm.getTransaction();
+ if (tx != null)
+ {
+ tx.setRollbackOnly();
+ }
+ }
+ catch (Exception e1)
+ {
+ log.warn("unnable to clear the transaction", e1);
+ try
+ {
+ session.rollback();
+ }
+ catch (HornetQException e2)
+ {
+ log.warn("Unable to rollback", e2);
+ return;
+ }
+ }
+ }
+
try
{
endpoint.afterDelivery();