[jboss-cvs] JBossAS SVN: r70714 - in branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server: jmx and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 10 21:55:00 EDT 2008


Author: adrian at jboss.org
Date: 2008-03-10 21:55:00 -0400 (Mon, 10 Mar 2008)
New Revision: 70714

Modified:
   branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java
   branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java
   branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java
   branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java
   branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java
Log:
[JBPAPP-663] [JBAPP-664] - Fixes for redelivery after a crash and race condition maintain unacknowledged messages

Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java	2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java	2008-03-11 01:55:00 UTC (rev 70714)
@@ -211,40 +211,48 @@
       if (trace)
          log.trace("addReceiver " + sub + " " + this);
       
-      MessageReference found = null;
-      synchronized (messages)
+      synchronized(receivers)
       {
-         if (messages.size() != 0)
+         MessageReference found = null;
+         synchronized (messages)
          {
-            for (Iterator it = messages.iterator(); it.hasNext();)
+            if (messages.size() != 0)
             {
-               MessageReference message = (MessageReference) it.next();
-               try
+               for (Iterator it = messages.iterator(); it.hasNext();)
                {
-                  if (message.isExpired())
+                  MessageReference message = (MessageReference) it.next();
+                  try
                   {
-                     it.remove();
-                     expireMessageAsync(message);
+                     if (message.isExpired())
+                     {
+                        it.remove();
+                        expireMessageAsync(message);
+                     }
+                     else if (sub.accepts(message.getHeaders()))
+                     {
+                        //queue message for sending to this sub
+                        it.remove();
+                        found = message;
+                        break;
+                     }
                   }
-                  else if (sub.accepts(message.getHeaders()))
+                  catch (JMSException ignore)
                   {
-                     //queue message for sending to this sub
-                     it.remove();
-                     found = message;
-                     break;
+                     log.info("Caught unusual exception in addToReceivers.", ignore);
                   }
                }
-               catch (JMSException ignore)
-               {
-                  log.info("Caught unusual exception in addToReceivers.", ignore);
-               }
             }
+            if (found != null)
+               setupMessageAcknowledgement(sub, found);
          }
+         if (found != null)
+         {
+            updateRedeliveryFlags(found);
+            queueMessageForSending(sub, found);
+         }
+         else
+            addToReceivers(sub);
       }
-      if (found != null)
-         queueMessageForSending(sub, found);
-      else
-         addToReceivers(sub);
    }
 
    /**
@@ -473,13 +481,17 @@
 
       try
       {
+         // Set redelivered, vendor-specific flags
          message.redelivered();
-         // Set redelivered, vendor-specific flags
-         message.invalidate();
-         // Update the persistent message outside the transaction
-         // We want to know the message might have been delivered regardless
-         if (message.isPersistent())
-            server.getPersistenceManager().update(message, null);
+         // Old style update - doesn't survive a crash
+         if (server.isLazyRedeliveryUpdate())
+         {
+            message.invalidate();
+            // Update the persistent message outside the transaction
+            // We want to know the message might have been delivered regardless
+            if (message.isPersistent())
+               server.getPersistenceManager().update(message, null);
+         }
       }
       catch (JMSException e)
       {
@@ -664,6 +676,9 @@
                   else
                      break;
                }
+               
+               if (messageRef != null)
+                  setupMessageAcknowledgement(sub, messageRef);
             }
          }
          else
@@ -687,6 +702,9 @@
                      break;
                   }
                }
+               
+               if (messageRef != null)
+                  setupMessageAcknowledgement(sub, messageRef);
             }
          }
 
@@ -695,14 +713,11 @@
             if (wait)
                addToReceivers(sub);
          }
-         else
-         {
-            setupMessageAcknowledgement(sub, messageRef);
-         }
       }
 
       if (messageRef == null)
          return null;
+      updateRedeliveryFlags(messageRef);
       return messageRef.getMessageForDelivery();
    }
 
@@ -1090,9 +1105,9 @@
                }
             }
 
-            if (found == null)
+            synchronized (messages)
             {
-               synchronized (messages)
+               if (found == null)
                {
                   messages.add(message);
 
@@ -1103,12 +1118,19 @@
                      addTimeout(message, new ExpireMessageTask(message), message.messageExpiration);
                   }
                }
+               else
+               {
+                  setupMessageAcknowledgement(found, message);
+               }
             }
          }
          
          // Queue to the receiver
          if (found != null)
+         {
+            updateRedeliveryFlags(message);
             queueMessageForSending(found, message);
+         }
       }
       catch (JMSException e)
       {
@@ -1133,7 +1155,6 @@
 
       try
       {
-         setupMessageAcknowledgement(sub, message);
          RoutedMessage r = new RoutedMessage();
          r.message = message;
          r.subscriptionId = new Integer(sub.subscriptionId);
@@ -1159,19 +1180,38 @@
       nack.destination = message.getJMSDestination();
       nack.messageID = message.getJMSMessageID();
       nack.subscriberId = sub.subscriptionId;
+      
+      UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);
+      unacknowledgedMessages.put(nack, unacked);
+      unackedByMessageRef.put(messageRef, nack);
+      HashMap map = (HashMap) unackedBySubscription.get(sub);
+      if (map == null)
+      {
+         map = new HashMap();
+         unackedBySubscription.put(sub, map);
+      }
+      map.put(messageRef, nack);
+   }
 
-      synchronized (messages)
+   /**
+    * Update redelivery flags
+    *
+    * @param messageRef the message to be acknowledged
+    * @throws JMSException for any error
+    */
+   protected void updateRedeliveryFlags(MessageReference messageRef) throws JMSException
+   {
+      // Need to update the message so it recovers from crash with redelivery=true 
+      if (server.isLazyRedeliveryUpdate() == false && messageRef.isPersistent())
       {
-         UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);
-         unacknowledgedMessages.put(nack, unacked);
-         unackedByMessageRef.put(messageRef, nack);
-         HashMap map = (HashMap) unackedBySubscription.get(sub);
-         if (map == null)
-         {
-            map = new HashMap();
-            unackedBySubscription.put(sub, map);
-         }
-         map.put(messageRef, nack);
+         // Temporarily set the flags
+         messageRef.redelivered();
+         // Update the persistent message outside the transaction
+         // We want to know the message might have been delivered regardless
+         messageRef.invalidate();
+         server.getPersistenceManager().update(messageRef, null);
+         // Now revert the flags back again (but only in memory)
+         messageRef.revertRedelivered();
       }
    }
 

Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java	2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java	2008-03-11 01:55:00 UTC (rev 70714)
@@ -102,7 +102,7 @@
    private int lastTemporaryQueue = 1;
 
    private Object lastTemporaryQueueLock = new Object();
-
+   
    /** The security manager */
    private StateManager stateManager;
 
@@ -112,6 +112,9 @@
    /** The Cache Used to hold messages */
    private MessageCache messageCache;
 
+   /** Whether to do the redelivery update lazily */
+   private boolean lazyRedeliveryUpdate;
+
    private Object stateLock = new Object();
 
    private Object idLock = new Object();
@@ -168,6 +171,26 @@
    }
 
    /**
+    * Get the lazyRedeliveryUpdate.
+    * 
+    * @return the lazyRedeliveryUpdate.
+    */
+   public boolean isLazyRedeliveryUpdate()
+   {
+      return lazyRedeliveryUpdate;
+   }
+
+   /**
+    * Set the lazyRedeliveryUpdate.
+    * 
+    * @param lazyRedeliveryUpdate the lazyRedeliveryUpdate.
+    */
+   public void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate)
+   {
+      this.lazyRedeliveryUpdate = lazyRedeliveryUpdate;
+   }
+
+   /**
     * Returns <code>false</code> if the JMS server is currently running and
     * handling requests, <code>true</code> otherwise.
     *
@@ -410,7 +433,7 @@
       val.setJMSRedelivered(false);
       val.header.jmsProperties.remove(SpyMessage.PROPERTY_REDELIVERY_COUNT);
       val.header.jmsProperties.remove(SpyMessage.PROPERTY_DELIVERY_COUNT);
-      
+
       //Add the message to the queue
       val.setReadOnlyMode();
       queue.addMessage(val, txId);

Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java	2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java	2008-03-11 01:55:00 UTC (rev 70714)
@@ -180,7 +180,7 @@
    /**
     * The message is being redelivered
     */
-   public void redelivered() throws JMSException
+   public synchronized void redelivered() throws JMSException
    {
       this.redelivered = true;
 
@@ -202,6 +202,28 @@
    }
 
    /**
+    * Revert the redelivered status
+    */
+   public synchronized void revertRedelivered() throws JMSException
+   {
+      this.redelivered = true;
+
+      messageScheduledDelivery = 0;
+
+      --redeliveryCount;
+      if (redeliveryCount == 0)
+         redelivered = false;
+      
+      if (isLateClone() == false)
+      {
+         SpyMessage message = getMessage();
+         message.setJMSRedelivered(redelivered);
+
+         message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(redeliveryCount));
+      }
+   }
+
+   /**
     * Returns true if this message reference has expired.
     */
    public boolean isExpired()

Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java	2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java	2008-03-11 01:55:00 UTC (rev 70714)
@@ -81,6 +81,8 @@
    private ObjectName threadPool;
    /** Default expiry destination */
    private ObjectName expiryDestination;
+   /** Whether to do the redelivery update lazily */
+   private boolean lazyRedeliveryUpdate = false;
 
    /**
     * @jmx:managed-attribute
@@ -264,6 +266,26 @@
    }
 
    /**
+    * Get the lazyRedeliveryUpdate.
+    * 
+    * @return the lazyRedeliveryUpdate.
+    */
+   public boolean isLazyRedeliveryUpdate()
+   {
+      return lazyRedeliveryUpdate;
+   }
+
+   /**
+    * Set the lazyRedeliveryUpdate.
+    * 
+    * @param lazyRedeliveryUpdate the lazyRedeliveryUpdate.
+    */
+   public void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate)
+   {
+      this.lazyRedeliveryUpdate = lazyRedeliveryUpdate;
+   }
+
+   /**
     * @jmx:managed-operation
     */
    public void createQueue(String name) throws Exception
@@ -384,6 +406,7 @@
    {
       super.createService();
       jmsServer = new JMSDestinationManager(tempParameters);
+      jmsServer.setLazyRedeliveryUpdate(lazyRedeliveryUpdate);
    }
 
    protected void startService() throws Exception
@@ -394,7 +417,7 @@
             ServiceControllerMBean.class,
             ServiceControllerMBean.OBJECT_NAME,
             server);
-
+      
       PersistenceManager pm = (PersistenceManager) server.getAttribute(persistenceManager, "Instance");
       jmsServer.setPersistenceManager(pm);
 

Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java	2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java	2008-03-11 01:55:00 UTC (rev 70714)
@@ -131,6 +131,15 @@
     */
    public void setExpiryDestination(ObjectName destination);
 
+   /**
+    * Get the lazyRedeliveryUpdate.
+    * 
+    * @return the lazyRedeliveryUpdate.
+    */
+   boolean isLazyRedeliveryUpdate();
+
+   void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate);
+
    void createQueue(java.lang.String name) throws java.lang.Exception;
 
    void createTopic(java.lang.String name) throws java.lang.Exception;




More information about the jboss-cvs-commits mailing list