[jboss-cvs] JBoss Messaging SVN: r8345 - branches/Branch_1_4/src/main/org/jboss/messaging/core/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 15 23:54:36 EDT 2011
Author: gaohoward
Date: 2011-06-15 23:54:36 -0400 (Wed, 15 Jun 2011)
New Revision: 8345
Modified:
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
Log:
JBMESSAGING-1879
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2011-06-16 03:43:01 UTC (rev 8344)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2011-06-16 03:54:36 UTC (rev 8345)
@@ -114,6 +114,8 @@
private volatile boolean stopServerPeerOnDBFailure = false;
+ private Object moveLock = new Object();
+
// Constructors --------------------------------------------------
public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -1692,8 +1694,10 @@
class MoveReferenceRunner extends JDBCTxRunner2
{
+ private int counter = 0;
public Object doTransaction() throws Exception
{
+ counter++;
PreparedStatement psReference = null;
PreparedStatement ps = null;
@@ -1732,7 +1736,22 @@
}
else
{
- log.info("Already moved message " + ref.getMessage().getMessageID());
+ log.info("Already moved message " + ref.getMessage().getMessageID() + " counter " + counter);
+ //there are two possible cases:
+ //1. last db retry failed but the updated actually successful.
+ //2. message claimed and re-sucked, but the first suck get in between the second suck
+ //having updated to 'S' state and doing this update, giving the first suck a
+ //chance to successfully update it, then the resucked one get here-- this results in
+ //duplicated messages!!!! This case only happens in using multiple consumers on a
+ //distributed queue case!
+ //to solve this we need a suckerLock object to serialize the update and a counter
+ //if counter > 1 and we get here, that's fine
+ //if counter = 1 and we get there, throw exception!
+ //JBMESSAGING-1879
+ if (counter == 1)
+ {
+ throw new JMSException("Failed to move message " + ref.getMessage().getMessageID() + ", already sucked.");
+ }
}
}
else
@@ -1751,8 +1770,11 @@
}
}
}
-
- new MoveReferenceRunner().executeWithRetry();
+
+ synchronized(moveLock)
+ {
+ new MoveReferenceRunner().executeWithRetry();
+ }
}
public void updateDeliveryCount(final long channelID,
@@ -3563,7 +3585,10 @@
log.trace("Message in suck claimed " + rows + " rows for message " + msgId);
}
- msgIDs.add(new ReferenceInfo(msgId, deliveryCount, sched));
+ if (rows == 1)
+ {
+ msgIDs.add(new ReferenceInfo(msgId, deliveryCount, sched));
+ }
}
}
finally
More information about the jboss-cvs-commits
mailing list