[hornetq-commits] JBoss hornetq SVN: r10324 - in branches/Branch_2_2_EAP/src/main/org/hornetq: core/server/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Mar 14 11:46:47 EDT 2011


Author: ataylor
Date: 2011-03-14 11:46:46 -0400 (Mon, 14 Mar 2011)
New Revision: 10324

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://issues.jboss.org/browse/JBPAPP-6049 - second part of fix to handle exceptions during xa calls and to handle xa retry properly

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -190,6 +190,8 @@
 
    private volatile SimpleString defaultAddress;
 
+   private boolean xaRetry = false;
+
    // Constructors ----------------------------------------------------------------------------
 
    public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
@@ -631,6 +633,15 @@
       return xa;
    }
 
+   public void resetIfNeeded() throws HornetQException
+   {
+      if(rollbackOnly)
+      {
+         log.warn("resetting session after failure");
+         rollback(false);
+      }
+   }
+
    public void start() throws HornetQException
    {
       checkClosed();
@@ -1193,9 +1204,10 @@
    {
       checkXA();
 
+      //we should never throw rollback if we have already prepared
       if (rollbackOnly)
       {
-         throw new XAException(XAException.XA_RBOTHER);
+         log.warn("committing transaction after failover occurred, any non persistent messages may be lost");
       }
 
       // Note - don't need to flush acks since the previous end would have
@@ -1211,29 +1223,27 @@
 
          if (response.isError())
          {
+            //if we retry and its not there the assume that it was committed
+            if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+            {
+               return;
+            }
             throw new XAException(response.getResponseCode());
          }
       }
       catch (HornetQException e)
       {
-         ClientSessionImpl.log.warn(e.getMessage(), e);
+         ClientSessionImpl.log.warn("failover occured during commit throwing XAException.XA_RETRY");
 
          if (e.getCode() == HornetQException.UNBLOCKED)
          {
             // Unblocked on failover
+            xaRetry = true;
+            throw new XAException(XAException.XA_RETRY);
+         }
 
-            try
-            {
-               rollback(false);
-            }
-            catch (HornetQException e2)
-            {
-               throw new XAException(XAException.XAER_RMERR);
-            }
+         ClientSessionImpl.log.warn(e.getMessage(), e);
 
-            throw new XAException(XAException.XA_RBOTHER);
-         }
-
          // This should never occur
          throw new XAException(XAException.XAER_RMERR);
       }
@@ -1365,17 +1375,35 @@
          }
          else
          {
+            xaRetry = false;
             return response.getResponseCode();
          }
       }
       catch (HornetQException e)
       {
-         ClientSessionImpl.log.warn(e.getMessage(), e);
-
          if (e.getCode() == HornetQException.UNBLOCKED)
          {
             // Unblocked on failover
+            try
+            {
+               log.warn("failover occurred during prepare re-trying");
+               SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
 
+               if (response.isError())
+               {
+                  throw new XAException(response.getResponseCode());
+               }
+               else
+               {
+                  xaRetry = false;
+                  return response.getResponseCode();
+               }
+            }
+            catch (HornetQException e1)
+            {
+               //ignore and rollback
+            }
+            log.warn("failover occurred during prepare rolling back");
             try
             {
                rollback(false);
@@ -1385,9 +1413,13 @@
                throw new XAException(XAException.XAER_RMERR);
             }
 
+            ClientSessionImpl.log.warn(e.getMessage(), e);
+
             throw new XAException(XAException.XA_RBOTHER);
          }
 
+         ClientSessionImpl.log.warn(e.getMessage(), e);
+
          // This should never occur
          throw new XAException(XAException.XAER_RMERR);
       }
@@ -1455,11 +1487,22 @@
 
          if (response.isError())
          {
+            //if we retry and its not there the assume that it was rolled back
+            if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+            {
+               return;
+            }
             throw new XAException(response.getResponseCode());
          }
       }
       catch (HornetQException e)
       {
+         if (e.getCode() == HornetQException.UNBLOCKED)
+         {
+            // Unblocked on failover
+            xaRetry = true;
+            throw new XAException(XAException.XA_RETRY);
+         }
          // This should never occur
          throw new XAException(XAException.XAER_RMERR);
       }
@@ -1485,10 +1528,11 @@
    public void start(final Xid xid, final int flags) throws XAException
    {
       checkXA();
+
+      Packet packet = null;
+
       try
       {
-         Packet packet;
-
          if (flags == XAResource.TMJOIN)
          {
             packet = new SessionXAJoinMessage(xid);
@@ -1519,6 +1563,27 @@
       }
       catch (HornetQException e)
       {
+         //we can retry this only because we know for sure that no work would have been done
+         if (e.getCode() == HornetQException.UNBLOCKED)
+         {
+            try
+            {
+               SessionXAResponseMessage response = (SessionXAResponseMessage)channel.sendBlocking(packet);
+
+               if (response.isError())
+               {
+                  ClientSessionImpl.log.error("XA operation failed " + response.getMessage() +
+                                              " code:" +
+                                              response.getResponseCode());
+                  throw new XAException(response.getResponseCode());
+               }
+            }
+            catch (HornetQException e1)
+            {
+               // This should never occur
+               throw new XAException(XAException.XAER_RMERR);
+            }
+         }
          // This should never occur
          throw new XAException(XAException.XAER_RMERR);
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -87,4 +87,7 @@
    void setAddress(Message message, SimpleString address);
    
    void setPacketSize(int packetSize);
+
+   void resetIfNeeded() throws HornetQException;
+
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -492,6 +492,11 @@
       return session.setTransactionTimeout(seconds);
    }
 
+   public void resetIfNeeded() throws HornetQException
+   {
+      session.resetIfNeeded();
+   }
+
    public void start() throws HornetQException
    {
       session.start();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -579,7 +579,14 @@
 
       doRollback(considerLastMessageAsDelivered, tx);
 
-      tx = new TransactionImpl(storageManager, timeoutSeconds);
+      if (xa)
+      {
+         tx = null;
+      }
+      else
+      {
+         tx = new TransactionImpl(storageManager, timeoutSeconds);
+      }
    }
 
    public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
@@ -878,6 +885,10 @@
                throw new HornetQXAException(XAException.XAER_PROTO,
                                             "Cannot prepare transaction, it is suspended " + xid);
             }
+            else if(theTx.getState() == Transaction.State.PREPARED)
+            {
+               log.info("ignoring prepare on xid as already called :" + xid);
+            }
             else
             {
                theTx.prepare();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQConnection.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -628,7 +628,7 @@
 
          HornetQConnection conn = connectionRef.get();
 
-         if (conn != null)
+         if (conn != null && ! failedOver)
          {
             try
             {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/HornetQRAXAResource.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -17,6 +17,8 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
 
 /**
@@ -70,8 +72,19 @@
       }
 
       managedConnection.lock();
+
+      ClientSessionInternal sessionInternal = (ClientSessionInternal) xaResource;
       try
       {
+         //this resets any tx stuff, we assume here that the tm and jca layer are well behaved when it comes to this
+         sessionInternal.resetIfNeeded();
+      }
+      catch (HornetQException e)
+      {
+         log.warn("problem resetting HornetQ xa session after failure");
+      }
+      try
+      {
          xaResource.start(xid, flags);
       }
       finally

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -33,6 +33,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.jms.client.HornetQConnectionFactory;
 import org.hornetq.jms.client.HornetQDestination;
@@ -287,7 +288,7 @@
          try
          {
             session = setupSession();
-            HornetQMessageHandler handler = new HornetQMessageHandler(this, ra.getTM(), session, i);
+            HornetQMessageHandler handler = new HornetQMessageHandler(this, ra.getTM(), (ClientSessionInternal) session, i);
             handler.setup();
             session.start();
             handlers.add(handler);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2011-03-13 02:22:03 UTC (rev 10323)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2011-03-14 15:46:46 UTC (rev 10324)
@@ -28,6 +28,7 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.ClientSession.QueueQuery;
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.jms.client.HornetQDestination;
 import org.hornetq.jms.client.HornetQMessage;
@@ -55,7 +56,7 @@
    /**
     * The session
     */
-   private final ClientSession session;
+   private final ClientSessionInternal session;
 
    private ClientConsumer consumer;
 
@@ -76,7 +77,7 @@
 
    public HornetQMessageHandler(final HornetQActivation activation,
                                 final TransactionManager tm,
-                                final ClientSession session,
+                                final ClientSessionInternal session,
                                 final int sessionNr)
    {
       this.activation = activation;
@@ -318,6 +319,17 @@
             }
          }
       }
+      finally
+      {
+         try
+         {
+            session.resetIfNeeded();
+         }
+         catch (HornetQException e)
+         {
+            log.warn("unable to reset session after failure");
+         }
+      }
 
    }
 



More information about the hornetq-commits mailing list