[hornetq-commits] JBoss hornetq SVN: r10362 - in branches/Branch_2_2_EAP/src/main/org/hornetq: core/postoffice/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 25 04:23:38 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-25 04:23:37 -0400 (Fri, 25 Mar 2011)
New Revision: 10362

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://issues.jboss.org/browse/JBPAPP-6153 - avoiding message loss on shutdown

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2011-03-25 08:23:37 UTC (rev 10362)
@@ -134,7 +134,7 @@
    @Override
    public String toString()
    {
-      return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
+      return "ClientMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
    }
 
    /* (non-Javadoc)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-25 08:23:37 UTC (rev 10362)
@@ -635,7 +635,7 @@
 
    public void resetIfNeeded() throws HornetQException
    {
-      if(rollbackOnly)
+      if (rollbackOnly)
       {
          log.warn("resetting session after failure");
          rollback(false);
@@ -1204,7 +1204,7 @@
    {
       checkXA();
 
-      //we should never throw rollback if we have already prepared
+      // we should never throw rollback if we have already prepared
       if (rollbackOnly)
       {
          log.warn("committing transaction after failover occurred, any non persistent messages may be lost");
@@ -1223,8 +1223,8 @@
 
          if (response.isError())
          {
-            //if we retry and its not there the assume that it was committed
-            if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+            // if we retry and its not there the assume that it was committed
+            if (xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
             {
                return;
             }
@@ -1395,7 +1395,7 @@
             }
             catch (HornetQException e1)
             {
-               //ignore and rollback
+               // ignore and rollback
             }
             log.warn("failover occurred during prepare rolling back");
             try
@@ -1481,8 +1481,8 @@
 
          if (response.isError())
          {
-            //if we retry and its not there the assume that it was rolled back
-            if(xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
+            // if we retry and its not there the assume that it was rolled back
+            if (xaRetry && response.getResponseCode() == XAException.XAER_NOTA)
             {
                return;
             }
@@ -1557,7 +1557,7 @@
       }
       catch (HornetQException e)
       {
-         //we can retry this only because we know for sure that no work would have been done
+         // we can retry this only because we know for sure that no work would have been done
          if (e.getCode() == HornetQException.UNBLOCKED)
          {
             try

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-25 08:23:37 UTC (rev 10362)
@@ -1062,10 +1062,7 @@
          {
             StringBuffer warnMessage = new StringBuffer();
             warnMessage.append("Duplicate message detected through the bridge - message will not be routed. Message information:\n");
-            for (SimpleString key : message.getPropertyNames())
-            {
-               warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
-            }
+            warnMessage.append(message.toString());
             PostOfficeImpl.log.warn(warnMessage.toString());
             
             if (context.getTransaction() != null)
@@ -1105,10 +1102,7 @@
          {
             StringBuffer warnMessage = new StringBuffer();
             warnMessage.append("Duplicate message detected - message will not be routed. Message information:\n");
-            for (SimpleString key : message.getPropertyNames())
-            {
-               warnMessage.append(key + "=" + message.getObjectProperty(key) + "\n");
-            }
+            warnMessage.append(message.toString());
             PostOfficeImpl.log.warn(warnMessage.toString());
 
             if (context.getTransaction() != null)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-03-25 08:23:37 UTC (rev 10362)
@@ -642,6 +642,16 @@
          }
 
       }
+      
+      // We close all the exception in an attempt to let any pending IO to finish
+      // to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
+      // It may still be possible to have this scenario on a real failure (without the use of XA)
+      // But at least we will do our best to avoid it on regular shutdowns
+      for (ServerSession session : sessions.values())
+      {
+    	 log.info("closing a session" );
+         session.close(true);
+      }
 
       remotingService.stop();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-03-25 08:23:37 UTC (rev 10362)
@@ -175,7 +175,7 @@
    {
       return "Reference[" + getMessage().getMessageID() +
              "]:" +
-             (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE");
+             (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") + ":" + getMessage() ;
    }
    // Package protected ---------------------------------------------
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2011-03-25 08:23:37 UTC (rev 10362)
@@ -273,7 +273,7 @@
    @Override
    public String toString()
    {
-      return "ServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + "]";
+      return "ServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
    }
 
    // FIXME - this is stuff that is only used in large messages

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2011-03-24 18:09:04 UTC (rev 10361)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2011-03-25 08:23:37 UTC (rev 10362)
@@ -19,15 +19,15 @@
 import javax.resource.ResourceException;
 import javax.resource.spi.endpoint.MessageEndpoint;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSession.QueueQuery;
 import org.hornetq.api.core.client.MessageHandler;
-import org.hornetq.api.core.client.ClientSession.QueueQuery;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.jms.client.HornetQDestination;
@@ -70,6 +70,8 @@
    private boolean useLocalTx;
    
    private boolean transacted;
+   
+   private boolean useXA = false;
 
    private final int sessionNr;
 
@@ -181,10 +183,12 @@
       if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
       {
          endpoint = endpointFactory.createEndpoint(session);
+         useXA = true;
       }
       else
       {
          endpoint = endpointFactory.createEndpoint(null);
+         useXA = false;
       }
       consumer.setMessageHandler(this);
    }
@@ -246,6 +250,8 @@
 
    public void onMessage(final ClientMessage message)
    {
+      HornetQMessageHandler.log.info("onMessage(" + message + ")");
+
       if (HornetQMessageHandler.trace)
       {
          HornetQMessageHandler.log.trace("onMessage(" + message + ")");
@@ -298,6 +304,35 @@
          // we need to call before/afterDelivery as a pair
          if (beforeDelivery)
          {
+            if (useXA && tm != null)
+            {
+               // This is the job for the container,
+               // however if the container throws an exception because of some other errors,
+               // there are situations where the container is not setting the rollback only
+               // this is to avoid a scenario where afterDelivery would kick in
+               try
+               {
+                  Transaction tx = tm.getTransaction();
+                  if (tx != null)
+                  {
+                     tx.setRollbackOnly();
+                  }
+               }
+               catch (Exception e1)
+               {
+                  log.warn("unnable to clear the transaction", e1);
+                  try
+                  {
+                     session.rollback();
+                  }
+                  catch (HornetQException e2)
+                  {
+                     log.warn("Unable to rollback", e2);
+                     return;
+                  }
+               }
+            }
+
             try
             {
                endpoint.afterDelivery();



More information about the hornetq-commits mailing list