Author: ataylor
Date: 2011-03-14 11:46:46 -0400 (Mon, 14 Mar 2011)
New Revision: 10324
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://issues.jboss.org/browse/JBPAPP-6049 - second part of fix to handle exceptions
during xa calls and to handle xa retry properly
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-13
02:22:03 UTC (rev 10323)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -190,6 +190,8 @@
private volatile SimpleString defaultAddress;
+ private boolean xaRetry = false;
+
// Constructors
----------------------------------------------------------------------------
public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
@@ -631,6 +633,15 @@
return xa;
}
+ public void resetIfNeeded() throws HornetQException
+ {
+ if(rollbackOnly)
+ {
+ log.warn("resetting session after failure");
+ rollback(false);
+ }
+ }
+
public void start() throws HornetQException
{
checkClosed();
@@ -1193,9 +1204,10 @@
{
checkXA();
+ //we should never throw rollback if we have already prepared
if (rollbackOnly)
{
- throw new XAException(XAException.XA_RBOTHER);
+ log.warn("committing transaction after failover occurred, any non
persistent messages may be lost");
}
// Note - don't need to flush acks since the previous end would have
@@ -1211,29 +1223,27 @@
if (response.isError())
{
+ //if we retry and its not there the assume that it was committed
+ if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+ {
+ return;
+ }
throw new XAException(response.getResponseCode());
}
}
catch (HornetQException e)
{
- ClientSessionImpl.log.warn(e.getMessage(), e);
+ ClientSessionImpl.log.warn("failover occured during commit throwing
XAException.XA_RETRY");
if (e.getCode() == HornetQException.UNBLOCKED)
{
// Unblocked on failover
+ xaRetry = true;
+ throw new XAException(XAException.XA_RETRY);
+ }
- try
- {
- rollback(false);
- }
- catch (HornetQException e2)
- {
- throw new XAException(XAException.XAER_RMERR);
- }
+ ClientSessionImpl.log.warn(e.getMessage(), e);
- throw new XAException(XAException.XA_RBOTHER);
- }
-
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
@@ -1365,17 +1375,35 @@
}
else
{
+ xaRetry = false;
return response.getResponseCode();
}
}
catch (HornetQException e)
{
- ClientSessionImpl.log.warn(e.getMessage(), e);
-
if (e.getCode() == HornetQException.UNBLOCKED)
{
// Unblocked on failover
+ try
+ {
+ log.warn("failover occurred during prepare re-trying");
+ SessionXAResponseMessage response =
(SessionXAResponseMessage)channel.sendBlocking(packet);
+ if (response.isError())
+ {
+ throw new XAException(response.getResponseCode());
+ }
+ else
+ {
+ xaRetry = false;
+ return response.getResponseCode();
+ }
+ }
+ catch (HornetQException e1)
+ {
+ //ignore and rollback
+ }
+ log.warn("failover occurred during prepare rolling back");
try
{
rollback(false);
@@ -1385,9 +1413,13 @@
throw new XAException(XAException.XAER_RMERR);
}
+ ClientSessionImpl.log.warn(e.getMessage(), e);
+
throw new XAException(XAException.XA_RBOTHER);
}
+ ClientSessionImpl.log.warn(e.getMessage(), e);
+
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
@@ -1455,11 +1487,22 @@
if (response.isError())
{
+ //if we retry and its not there the assume that it was rolled back
+ if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+ {
+ return;
+ }
throw new XAException(response.getResponseCode());
}
}
catch (HornetQException e)
{
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ // Unblocked on failover
+ xaRetry = true;
+ throw new XAException(XAException.XA_RETRY);
+ }
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
@@ -1485,10 +1528,11 @@
public void start(final Xid xid, final int flags) throws XAException
{
checkXA();
+
+ Packet packet = null;
+
try
{
- Packet packet;
-
if (flags == XAResource.TMJOIN)
{
packet = new SessionXAJoinMessage(xid);
@@ -1519,6 +1563,27 @@
}
catch (HornetQException e)
{
+ //we can retry this only because we know for sure that no work would have been
done
+ if (e.getCode() == HornetQException.UNBLOCKED)
+ {
+ try
+ {
+ SessionXAResponseMessage response =
(SessionXAResponseMessage)channel.sendBlocking(packet);
+
+ if (response.isError())
+ {
+ ClientSessionImpl.log.error("XA operation failed " +
response.getMessage() +
+ " code:" +
+ response.getResponseCode());
+ throw new XAException(response.getResponseCode());
+ }
+ }
+ catch (HornetQException e1)
+ {
+ // This should never occur
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2011-03-13
02:22:03 UTC (rev 10323)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -87,4 +87,7 @@
void setAddress(Message message, SimpleString address);
void setPacketSize(int packetSize);
+
+ void resetIfNeeded() throws HornetQException;
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-03-13
02:22:03 UTC (rev 10323)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -492,6 +492,11 @@
return session.setTransactionTimeout(seconds);
}
+ public void resetIfNeeded() throws HornetQException
+ {
+ session.resetIfNeeded();
+ }
+
public void start() throws HornetQException
{
session.start();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-13
02:22:03 UTC (rev 10323)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -579,7 +579,14 @@
doRollback(considerLastMessageAsDelivered, tx);
- tx = new TransactionImpl(storageManager, timeoutSeconds);
+ if (xa)
+ {
+ tx = null;
+ }
+ else
+ {
+ tx = new TransactionImpl(storageManager, timeoutSeconds);
+ }
}
public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
@@ -878,6 +885,10 @@
throw new HornetQXAException(XAException.XAER_PROTO,
"Cannot prepare transaction, it is
suspended " + xid);
}
+ else if(theTx.getState() == Transaction.State.PREPARED)
+ {
+ log.info("ignoring prepare on xid as already called :" + xid);
+ }
else
{
theTx.prepare();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-03-13
02:22:03 UTC (rev 10323)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -628,7 +628,7 @@
HornetQConnection conn = connectionRef.get();
- if (conn != null)
+ if (conn != null && ! failedOver)
{
try
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java 2011-03-13
02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -17,6 +17,8 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
/**
@@ -70,8 +72,19 @@
}
managedConnection.lock();
+
+ ClientSessionInternal sessionInternal = (ClientSessionInternal) xaResource;
try
{
+ //this resets any tx stuff, we assume here that the tm and jca layer are well
behaved when it comes to this
+ sessionInternal.resetIfNeeded();
+ }
+ catch (HornetQException e)
+ {
+ log.warn("problem resetting HornetQ xa session after failure");
+ }
+ try
+ {
xaResource.start(xid, flags);
}
finally
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-03-13
02:22:03 UTC (rev 10323)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -33,6 +33,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
@@ -287,7 +288,7 @@
try
{
session = setupSession();
- HornetQMessageHandler handler = new HornetQMessageHandler(this, ra.getTM(),
session, i);
+ HornetQMessageHandler handler = new HornetQMessageHandler(this, ra.getTM(),
(ClientSessionInternal) session, i);
handler.setup();
session.start();
handlers.add(handler);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2011-03-13
02:22:03 UTC (rev 10323)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2011-03-14
15:46:46 UTC (rev 10324)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.api.core.client.ClientSession.QueueQuery;
+import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
@@ -55,7 +56,7 @@
/**
* The session
*/
- private final ClientSession session;
+ private final ClientSessionInternal session;
private ClientConsumer consumer;
@@ -76,7 +77,7 @@
public HornetQMessageHandler(final HornetQActivation activation,
final TransactionManager tm,
- final ClientSession session,
+ final ClientSessionInternal session,
final int sessionNr)
{
this.activation = activation;
@@ -318,6 +319,17 @@
}
}
}
+ finally
+ {
+ try
+ {
+ session.resetIfNeeded();
+ }
+ catch (HornetQException e)
+ {
+ log.warn("unable to reset session after failure");
+ }
+ }
}