[jboss-cvs] JBoss Messaging SVN: r5630 - branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 13 21:28:22 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-13 21:28:21 -0500 (Tue, 13 Jan 2009)
New Revision: 5630

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
tweaks

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-14 01:12:44 UTC (rev 5629)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-14 02:28:21 UTC (rev 5630)
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -94,6 +95,8 @@
 
    private final ServerSession session;
 
+   private final Executor executor;
+
    private final Lock lock = new ReentrantLock();
 
    private AtomicInteger availableCredits = new AtomicInteger(0);
@@ -133,7 +136,8 @@
                              final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                              final PostOffice postOffice,
                              final Channel channel,
-                             final boolean preAcknowledge)
+                             final boolean preAcknowledge,
+                             final Executor executor)
    {
       this.id = id;
 
@@ -143,6 +147,8 @@
 
       this.session = session;
 
+      this.executor = executor;
+
       this.started = browseOnly || started;
 
       this.browseOnly = browseOnly;
@@ -485,11 +491,7 @@
    {
       if (largeMessageSender != null)
       {
-         if (largeMessageSender.sendLargeMessage())
-         {
-            // prompt Delivery only if chunk was finished
-            session.promptDelivery(messageQueue);
-         }
+         resumeLargeMessage();
       }
       else
       {
@@ -497,6 +499,25 @@
       }
    }
 
+   /**
+    * 
+    */
+   private void resumeLargeMessage()
+   {
+      if (messageQueue.isBackup())
+      {
+         // We are supposed to finish largeMessageSender, or use all the possible credits before we return this method.
+         // If we play the commands on a different order than how they were generated on the live node, we will 
+         // eventually still be running this largeMessage before the next message come, what would reject messages
+         // from the cluster
+         largeMessageSender.resumeLargeMessageRunnable.run();
+      }
+      else
+      {
+         executor.execute(largeMessageSender.resumeLargeMessageRunnable);
+      }
+   }
+
    private HandleStatus doHandle(final MessageReference ref) throws Exception
    {
       if (availableCredits != null && availableCredits.get() <= 0)
@@ -540,8 +561,6 @@
          // This has to be checked inside the lock as the set to null is done inside the lock
          if (largeMessageSender != null)
          {
-            log.info("doHandle: LargeMessageSender != null," + messageQueue.isBackup() +
-                     " can't send another message while send is pending", new Exception("trace"));
             if (trace)
             {
                trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
@@ -656,6 +675,18 @@
       /** The current position on the message being processed */
       private volatile long positionPendingLargeMessage;
 
+      final Runnable resumeLargeMessageRunnable = new Runnable()
+      {
+         public void run()
+         {
+            if (sendLargeMessage())
+            {
+               // prompt Delivery only if chunk was finished
+               session.promptDelivery(messageQueue);
+            }
+         }
+      };
+
       public LargeMessageSender(final LargeServerMessage message, final MessageReference ref)
       {
          pendingLargeMessage = message;

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-14 01:12:44 UTC (rev 5629)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-14 02:28:21 UTC (rev 5630)
@@ -1301,7 +1301,8 @@
                                                           queueSettingsRepository,
                                                           postOffice,
                                                           channel,
-                                                          preAcknowledge);
+                                                          preAcknowledge,
+                                                          executor);
 
          consumers.put(consumer.getID(), consumer);
 




More information about the jboss-cvs-commits mailing list