[jboss-cvs] JBoss Messaging SVN: r5992 - trunk/src/main/org/jboss/messaging/core/server/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 4 08:36:18 EST 2009


Author: timfox
Date: 2009-03-04 08:36:17 -0500 (Wed, 04 Mar 2009)
New Revision: 5992

Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
partially re-instated force depage

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-03-04 12:58:25 UTC (rev 5991)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-03-04 13:36:17 UTC (rev 5992)
@@ -409,7 +409,7 @@
 
    public void deliverReplicated(final long messageID) throws Exception
    {
-      MessageReference ref = messageQueue.removeReferenceWithID(messageID);
+      MessageReference ref = removeFirstReference(messageID);
 
       if (ref == null)
       {
@@ -431,7 +431,7 @@
                                          handled);
       }
    }
-
+   
    public void failedOver()
    {
       if (messageQueue.consumerFailedOver())
@@ -456,6 +456,34 @@
    // Public ---------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
+
+   private MessageReference removeFirstReference(final long id) throws Exception
+   {
+      MessageReference ref = messageQueue.removeFirstReference(id);
+
+      if (ref == null)
+      {
+         // The order is correct, but it hasn't been depaged yet, so we need to force a depage
+         PagingStore store = pagingManager.getPageStore(binding.getAddress());
+         
+         // force a depage
+         if (!store.readPage()) // This returns false if there are no pages
+         {
+            throw new IllegalStateException("Cannot find page");
+         }
+         else
+         {
+            ref = messageQueue.removeFirstReference(id);
+            
+            if (ref == null)
+            {
+               throw new IllegalStateException("Cannot find ref after depaging");
+            }
+         }
+      }
+
+      return ref;
+   }
    
    private void promptDelivery()
    {
@@ -659,21 +687,21 @@
          Packet replPacket = new SessionReplicateDeliveryMessage(id, message.getMessageID());
          replPacket.setChannelID(channel.getID());
 
-//         log.info("replicating delivery from live for queue " + messageQueue.getName() +
-//                  " ref " +
-//                  message.getMessageID() +
-//                  " session name " +
-//                  session.getName());
+         // log.info("replicating delivery from live for queue " + messageQueue.getName() +
+         // " ref " +
+         // message.getMessageID() +
+         // " session name " +
+         // session.getName());
 
          replicatingChannel.replicatePacket(replPacket, replicatedSessionID, new Runnable()
          {
             public void run()
             {
-//               log.info("got replicate delivery response " + messageQueue.getName() +
-//                        " ref " +
-//                        message.getMessageID() +
-//                        " session name " +
-//                        session.getName());
+               // log.info("got replicate delivery response " + messageQueue.getName() +
+               // " ref " +
+               // message.getMessageID() +
+               // " session name " +
+               // session.getName());
                channel.send(packet);
             }
          });




More information about the jboss-cvs-commits mailing list