[jboss-cvs] JBossAS SVN: r70714 - in branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server: jmx and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 10 21:55:00 EDT 2008
Author: adrian at jboss.org
Date: 2008-03-10 21:55:00 -0400 (Mon, 10 Mar 2008)
New Revision: 70714
Modified:
branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java
branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java
branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java
branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java
branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java
Log:
[JBPAPP-663] [JBAPP-664] - Fixes for redelivery after a crash and race condition maintain unacknowledged messages
Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java 2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/BasicQueue.java 2008-03-11 01:55:00 UTC (rev 70714)
@@ -211,40 +211,48 @@
if (trace)
log.trace("addReceiver " + sub + " " + this);
- MessageReference found = null;
- synchronized (messages)
+ synchronized(receivers)
{
- if (messages.size() != 0)
+ MessageReference found = null;
+ synchronized (messages)
{
- for (Iterator it = messages.iterator(); it.hasNext();)
+ if (messages.size() != 0)
{
- MessageReference message = (MessageReference) it.next();
- try
+ for (Iterator it = messages.iterator(); it.hasNext();)
{
- if (message.isExpired())
+ MessageReference message = (MessageReference) it.next();
+ try
{
- it.remove();
- expireMessageAsync(message);
+ if (message.isExpired())
+ {
+ it.remove();
+ expireMessageAsync(message);
+ }
+ else if (sub.accepts(message.getHeaders()))
+ {
+ //queue message for sending to this sub
+ it.remove();
+ found = message;
+ break;
+ }
}
- else if (sub.accepts(message.getHeaders()))
+ catch (JMSException ignore)
{
- //queue message for sending to this sub
- it.remove();
- found = message;
- break;
+ log.info("Caught unusual exception in addToReceivers.", ignore);
}
}
- catch (JMSException ignore)
- {
- log.info("Caught unusual exception in addToReceivers.", ignore);
- }
}
+ if (found != null)
+ setupMessageAcknowledgement(sub, found);
}
+ if (found != null)
+ {
+ updateRedeliveryFlags(found);
+ queueMessageForSending(sub, found);
+ }
+ else
+ addToReceivers(sub);
}
- if (found != null)
- queueMessageForSending(sub, found);
- else
- addToReceivers(sub);
}
/**
@@ -473,13 +481,17 @@
try
{
+ // Set redelivered, vendor-specific flags
message.redelivered();
- // Set redelivered, vendor-specific flags
- message.invalidate();
- // Update the persistent message outside the transaction
- // We want to know the message might have been delivered regardless
- if (message.isPersistent())
- server.getPersistenceManager().update(message, null);
+ // Old style update - doesn't survive a crash
+ if (server.isLazyRedeliveryUpdate())
+ {
+ message.invalidate();
+ // Update the persistent message outside the transaction
+ // We want to know the message might have been delivered regardless
+ if (message.isPersistent())
+ server.getPersistenceManager().update(message, null);
+ }
}
catch (JMSException e)
{
@@ -664,6 +676,9 @@
else
break;
}
+
+ if (messageRef != null)
+ setupMessageAcknowledgement(sub, messageRef);
}
}
else
@@ -687,6 +702,9 @@
break;
}
}
+
+ if (messageRef != null)
+ setupMessageAcknowledgement(sub, messageRef);
}
}
@@ -695,14 +713,11 @@
if (wait)
addToReceivers(sub);
}
- else
- {
- setupMessageAcknowledgement(sub, messageRef);
- }
}
if (messageRef == null)
return null;
+ updateRedeliveryFlags(messageRef);
return messageRef.getMessageForDelivery();
}
@@ -1090,9 +1105,9 @@
}
}
- if (found == null)
+ synchronized (messages)
{
- synchronized (messages)
+ if (found == null)
{
messages.add(message);
@@ -1103,12 +1118,19 @@
addTimeout(message, new ExpireMessageTask(message), message.messageExpiration);
}
}
+ else
+ {
+ setupMessageAcknowledgement(found, message);
+ }
}
}
// Queue to the receiver
if (found != null)
+ {
+ updateRedeliveryFlags(message);
queueMessageForSending(found, message);
+ }
}
catch (JMSException e)
{
@@ -1133,7 +1155,6 @@
try
{
- setupMessageAcknowledgement(sub, message);
RoutedMessage r = new RoutedMessage();
r.message = message;
r.subscriptionId = new Integer(sub.subscriptionId);
@@ -1159,19 +1180,38 @@
nack.destination = message.getJMSDestination();
nack.messageID = message.getJMSMessageID();
nack.subscriberId = sub.subscriptionId;
+
+ UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);
+ unacknowledgedMessages.put(nack, unacked);
+ unackedByMessageRef.put(messageRef, nack);
+ HashMap map = (HashMap) unackedBySubscription.get(sub);
+ if (map == null)
+ {
+ map = new HashMap();
+ unackedBySubscription.put(sub, map);
+ }
+ map.put(messageRef, nack);
+ }
- synchronized (messages)
+ /**
+ * Update redelivery flags
+ *
+ * @param messageRef the message to be acknowledged
+ * @throws JMSException for any error
+ */
+ protected void updateRedeliveryFlags(MessageReference messageRef) throws JMSException
+ {
+ // Need to update the message so it recovers from crash with redelivery=true
+ if (server.isLazyRedeliveryUpdate() == false && messageRef.isPersistent())
{
- UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);
- unacknowledgedMessages.put(nack, unacked);
- unackedByMessageRef.put(messageRef, nack);
- HashMap map = (HashMap) unackedBySubscription.get(sub);
- if (map == null)
- {
- map = new HashMap();
- unackedBySubscription.put(sub, map);
- }
- map.put(messageRef, nack);
+ // Temporarily set the flags
+ messageRef.redelivered();
+ // Update the persistent message outside the transaction
+ // We want to know the message might have been delivered regardless
+ messageRef.invalidate();
+ server.getPersistenceManager().update(messageRef, null);
+ // Now revert the flags back again (but only in memory)
+ messageRef.revertRedelivered();
}
}
Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java 2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/JMSDestinationManager.java 2008-03-11 01:55:00 UTC (rev 70714)
@@ -102,7 +102,7 @@
private int lastTemporaryQueue = 1;
private Object lastTemporaryQueueLock = new Object();
-
+
/** The security manager */
private StateManager stateManager;
@@ -112,6 +112,9 @@
/** The Cache Used to hold messages */
private MessageCache messageCache;
+ /** Whether to do the redelivery update lazily */
+ private boolean lazyRedeliveryUpdate;
+
private Object stateLock = new Object();
private Object idLock = new Object();
@@ -168,6 +171,26 @@
}
/**
+ * Get the lazyRedeliveryUpdate.
+ *
+ * @return the lazyRedeliveryUpdate.
+ */
+ public boolean isLazyRedeliveryUpdate()
+ {
+ return lazyRedeliveryUpdate;
+ }
+
+ /**
+ * Set the lazyRedeliveryUpdate.
+ *
+ * @param lazyRedeliveryUpdate the lazyRedeliveryUpdate.
+ */
+ public void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate)
+ {
+ this.lazyRedeliveryUpdate = lazyRedeliveryUpdate;
+ }
+
+ /**
* Returns <code>false</code> if the JMS server is currently running and
* handling requests, <code>true</code> otherwise.
*
@@ -410,7 +433,7 @@
val.setJMSRedelivered(false);
val.header.jmsProperties.remove(SpyMessage.PROPERTY_REDELIVERY_COUNT);
val.header.jmsProperties.remove(SpyMessage.PROPERTY_DELIVERY_COUNT);
-
+
//Add the message to the queue
val.setReadOnlyMode();
queue.addMessage(val, txId);
Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java 2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/MessageReference.java 2008-03-11 01:55:00 UTC (rev 70714)
@@ -180,7 +180,7 @@
/**
* The message is being redelivered
*/
- public void redelivered() throws JMSException
+ public synchronized void redelivered() throws JMSException
{
this.redelivered = true;
@@ -202,6 +202,28 @@
}
/**
+ * Revert the redelivered status
+ */
+ public synchronized void revertRedelivered() throws JMSException
+ {
+ this.redelivered = true;
+
+ messageScheduledDelivery = 0;
+
+ --redeliveryCount;
+ if (redeliveryCount == 0)
+ redelivered = false;
+
+ if (isLateClone() == false)
+ {
+ SpyMessage message = getMessage();
+ message.setJMSRedelivered(redelivered);
+
+ message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(redeliveryCount));
+ }
+ }
+
+ /**
* Returns true if this message reference has expired.
*/
public boolean isExpired()
Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java 2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManager.java 2008-03-11 01:55:00 UTC (rev 70714)
@@ -81,6 +81,8 @@
private ObjectName threadPool;
/** Default expiry destination */
private ObjectName expiryDestination;
+ /** Whether to do the redelivery update lazily */
+ private boolean lazyRedeliveryUpdate = false;
/**
* @jmx:managed-attribute
@@ -264,6 +266,26 @@
}
/**
+ * Get the lazyRedeliveryUpdate.
+ *
+ * @return the lazyRedeliveryUpdate.
+ */
+ public boolean isLazyRedeliveryUpdate()
+ {
+ return lazyRedeliveryUpdate;
+ }
+
+ /**
+ * Set the lazyRedeliveryUpdate.
+ *
+ * @param lazyRedeliveryUpdate the lazyRedeliveryUpdate.
+ */
+ public void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate)
+ {
+ this.lazyRedeliveryUpdate = lazyRedeliveryUpdate;
+ }
+
+ /**
* @jmx:managed-operation
*/
public void createQueue(String name) throws Exception
@@ -384,6 +406,7 @@
{
super.createService();
jmsServer = new JMSDestinationManager(tempParameters);
+ jmsServer.setLazyRedeliveryUpdate(lazyRedeliveryUpdate);
}
protected void startService() throws Exception
@@ -394,7 +417,7 @@
ServiceControllerMBean.class,
ServiceControllerMBean.OBJECT_NAME,
server);
-
+
PersistenceManager pm = (PersistenceManager) server.getAttribute(persistenceManager, "Instance");
jmsServer.setPersistenceManager(pm);
Modified: branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java
===================================================================
--- branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java 2008-03-11 01:19:27 UTC (rev 70713)
+++ branches/JBPAPP_4_2_0_GA_CP/messaging/src/main/org/jboss/mq/server/jmx/DestinationManagerMBean.java 2008-03-11 01:55:00 UTC (rev 70714)
@@ -131,6 +131,15 @@
*/
public void setExpiryDestination(ObjectName destination);
+ /**
+ * Get the lazyRedeliveryUpdate.
+ *
+ * @return the lazyRedeliveryUpdate.
+ */
+ boolean isLazyRedeliveryUpdate();
+
+ void setLazyRedeliveryUpdate(boolean lazyRedeliveryUpdate);
+
void createQueue(java.lang.String name) throws java.lang.Exception;
void createTopic(java.lang.String name) throws java.lang.Exception;
More information about the jboss-cvs-commits
mailing list