[jboss-cvs] JBoss Messaging SVN: r5992 - trunk/src/main/org/jboss/messaging/core/server/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 4 08:36:18 EST 2009
Author: timfox
Date: 2009-03-04 08:36:17 -0500 (Wed, 04 Mar 2009)
New Revision: 5992
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
partially re-instated force depage
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-03-04 12:58:25 UTC (rev 5991)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-03-04 13:36:17 UTC (rev 5992)
@@ -409,7 +409,7 @@
public void deliverReplicated(final long messageID) throws Exception
{
- MessageReference ref = messageQueue.removeReferenceWithID(messageID);
+ MessageReference ref = removeFirstReference(messageID);
if (ref == null)
{
@@ -431,7 +431,7 @@
handled);
}
}
-
+
public void failedOver()
{
if (messageQueue.consumerFailedOver())
@@ -456,6 +456,34 @@
// Public ---------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
+
+ private MessageReference removeFirstReference(final long id) throws Exception
+ {
+ MessageReference ref = messageQueue.removeFirstReference(id);
+
+ if (ref == null)
+ {
+ // The order is correct, but it hasn't been depaged yet, so we need to force a depage
+ PagingStore store = pagingManager.getPageStore(binding.getAddress());
+
+ // force a depage
+ if (!store.readPage()) // This returns false if there are no pages
+ {
+ throw new IllegalStateException("Cannot find page");
+ }
+ else
+ {
+ ref = messageQueue.removeFirstReference(id);
+
+ if (ref == null)
+ {
+ throw new IllegalStateException("Cannot find ref after depaging");
+ }
+ }
+ }
+
+ return ref;
+ }
private void promptDelivery()
{
@@ -659,21 +687,21 @@
Packet replPacket = new SessionReplicateDeliveryMessage(id, message.getMessageID());
replPacket.setChannelID(channel.getID());
-// log.info("replicating delivery from live for queue " + messageQueue.getName() +
-// " ref " +
-// message.getMessageID() +
-// " session name " +
-// session.getName());
+ // log.info("replicating delivery from live for queue " + messageQueue.getName() +
+ // " ref " +
+ // message.getMessageID() +
+ // " session name " +
+ // session.getName());
replicatingChannel.replicatePacket(replPacket, replicatedSessionID, new Runnable()
{
public void run()
{
-// log.info("got replicate delivery response " + messageQueue.getName() +
-// " ref " +
-// message.getMessageID() +
-// " session name " +
-// session.getName());
+ // log.info("got replicate delivery response " + messageQueue.getName() +
+ // " ref " +
+ // message.getMessageID() +
+ // " session name " +
+ // session.getName());
channel.send(packet);
}
});
More information about the jboss-cvs-commits
mailing list