[jboss-cvs] JBossAS SVN: r78659 - in branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server: jmx and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Sep 17 20:36:28 EDT 2008


Author: jhowell at redhat.com
Date: 2008-09-17 20:36:28 -0400 (Wed, 17 Sep 2008)
New Revision: 78659

Modified:
   branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/BasicQueue.java
   branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java
   branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/MessageReference.java
   branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java
   branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java
Log:
[JBAS-5972] - One off for 4.0.4 - >JBAS-4870 - Redelivery flags not updated in case of JBossMQ node failure

Modified: branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/BasicQueue.java
===================================================================
--- branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/BasicQueue.java	2008-09-17 23:44:23 UTC (rev 78658)
+++ branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/BasicQueue.java	2008-09-18 00:36:28 UTC (rev 78659)
@@ -397,13 +397,17 @@
 
       try
       {
+         // Set redelivered, vendor-specific flags
          message.redelivered();
-         // Set redelivered, vendor-specific flags
-         message.invalidate();
-         // Update the persistent message outside the transaction
-         // We want to know the message might have been delivered regardless
-         if (message.isPersistent())
-            server.getPersistenceManager().update(message, null);
+         // Old style update - doesn't survive a crash
+         if (server.isLazyRedeliveryUpdate())
+         {
+            message.invalidate();
+            // Update the persistent message outside the transaction
+            // We want to know the message might have been delivered regardless
+            if (message.isPersistent())
+               server.getPersistenceManager().update(message, null);
+         }
       }
       catch (JMSException e)
       {
@@ -961,6 +965,19 @@
       nack.messageID = message.getJMSMessageID();
       nack.subscriberId = sub.subscriptionId;
 
+      // Need to update the message so it recovers from crash with redelivery=true 
+      if (server.isLazyRedeliveryUpdate() == false && messageRef.isPersistent())
+      {
+         // Temporarily set the flags
+         messageRef.redelivered();
+         // Update the persistent message outside the transaction
+         // We want to know the message might have been delivered regardless
+         messageRef.invalidate();
+         server.getPersistenceManager().update(messageRef, null);
+         // Now revert the flags back again (but only in memory)
+         messageRef.revertRedelivered();
+      }
+      
       synchronized (messages)
       {
          UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);

Modified: branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java
===================================================================
--- branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java	2008-09-17 23:44:23 UTC (rev 78658)
+++ branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java	2008-09-18 00:36:28 UTC (rev 78659)
@@ -102,6 +102,9 @@
    //The Cache Used to hold messages
    private MessageCache messageCache;
 
+   /** Whether to do the redelivery update lazily */
+   private boolean lazyRedeliveryUpdate;
+
    private Object stateLock = new Object();
 
    private Object idLock = new Object();
@@ -145,6 +148,26 @@
    }
 
    /**
+    * Get the lazyRedeliveryUpdate.
+    * 
+    * @return the lazyRedeliveryUpdate.
+    */
+   public boolean isLazyRedeliveryUpdate()
+   {
+      return lazyRedeliveryUpdate;
+   }
+
+   /**
+    * Set the lazyRedeliveryUpdate.
+    * 
+    * @param lazyRedeliveryUpdate the lazyRedeliveryUpdate.
+    */
+   public void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate)
+   {
+      this.lazyRedeliveryUpdate = lazyRedeliveryUpdate;
+   }
+
+   /**
     * Returns <code>false</code> if the JMS server is currently running and
     * handling requests, <code>true</code> otherwise.
    *

Modified: branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/MessageReference.java
===================================================================
--- branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/MessageReference.java	2008-09-17 23:44:23 UTC (rev 78658)
+++ branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/MessageReference.java	2008-09-18 00:36:28 UTC (rev 78659)
@@ -180,7 +180,7 @@
    /**
     * The message is being redelivered
     */
-   public void redelivered() throws JMSException
+   public synchronized void redelivered() throws JMSException
    {
       this.redelivered = true;
 
@@ -192,7 +192,7 @@
 
       ++redeliveryCount;
       
-      if (queue.parameters.lateClone == false)
+      if (isLateClone() == false)
       {
          SpyMessage message = getMessage();
          message.setJMSRedelivered(redelivered);
@@ -202,6 +202,28 @@
    }
 
    /**
+    * Revert the redelivered status
+    */
+   public synchronized void revertRedelivered() throws JMSException
+   {
+      this.redelivered = true;
+
+      messageScheduledDelivery = 0;
+
+      --redeliveryCount;
+      if (redeliveryCount == 0)
+         redelivered = false;
+      
+      if (isLateClone() == false)
+      {
+         SpyMessage message = getMessage();
+         message.setJMSRedelivered(redelivered);
+
+         message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(redeliveryCount));
+      }
+   }
+
+   /**
     * Returns true if this message reference has expired.
     */
    public boolean isExpired()

Modified: branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java
===================================================================
--- branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java	2008-09-17 23:44:23 UTC (rev 78658)
+++ branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java	2008-09-18 00:36:28 UTC (rev 78659)
@@ -77,6 +77,8 @@
    private ObjectName threadPool;
    /** Default expiry destination */
    private ObjectName expiryDestination;
+   /** Whether to do the redelivery update lazily */
+   private boolean lazyRedeliveryUpdate = false;
 
    /**
     * @jmx:managed-attribute
@@ -260,6 +262,26 @@
    }
 
    /**
+    * Get the lazyRedeliveryUpdate.
+    * 
+    * @return the lazyRedeliveryUpdate.
+    */
+   public boolean isLazyRedeliveryUpdate()
+   {
+      return lazyRedeliveryUpdate;
+   }
+
+   /**
+    * Set the lazyRedeliveryUpdate.
+    * 
+    * @param lazyRedeliveryUpdate the lazyRedeliveryUpdate.
+    */
+   public void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate)
+   {
+      this.lazyRedeliveryUpdate = lazyRedeliveryUpdate;
+   }
+
+   /**
     * @jmx:managed-operation
     */
    public void createQueue(String name) throws Exception

Modified: branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java
===================================================================
--- branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java	2008-09-17 23:44:23 UTC (rev 78658)
+++ branches/JBoss_4_0_4_GA_JBAS-4870/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java	2008-09-18 00:36:28 UTC (rev 78659)
@@ -129,6 +129,15 @@
     */
    public void setExpiryDestination(ObjectName destination);
 
+   /**
+    * Get the lazyRedeliveryUpdate.
+    * 
+    * @return the lazyRedeliveryUpdate.
+    */
+   boolean isLazyRedeliveryUpdate();
+
+   void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate);
+
    void createQueue(java.lang.String name) throws java.lang.Exception;
 
    void createTopic(java.lang.String name) throws java.lang.Exception;




More information about the jboss-cvs-commits mailing list