[jboss-cvs] JBoss Messaging SVN: r5627 - branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 13 18:37:49 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-13 18:37:49 -0500 (Tue, 13 Jan 2009)
New Revision: 5627

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Change on SendLargeMessage

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-13 22:56:14 UTC (rev 5626)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-13 23:37:49 UTC (rev 5627)
@@ -26,7 +26,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -94,6 +94,8 @@
    private final int minLargeMessageSize;
 
    private final ServerSession session;
+   
+   private final Executor executor;
 
    private final Lock lock = new ReentrantLock();
 
@@ -134,7 +136,8 @@
                              final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                              final PostOffice postOffice,
                              final Channel channel,
-                             final boolean preAcknowledge)
+                             final boolean preAcknowledge,
+                             final Executor executor)
    {
       this.id = id;
 
@@ -143,6 +146,8 @@
       this.filter = filter;
 
       this.session = session;
+      
+      this.executor = executor;
 
       this.started = browseOnly || started;
 
@@ -486,11 +491,20 @@
    {
       if (largeMessageSender != null)
       {
-         if (largeMessageSender.sendLargeMessage())
-         {
-            // prompt Delivery only if chunk was finished
-            session.promptDelivery(messageQueue);
-         }
+         // If there is a LargeMessageSender, it needs to first resume & finalize sending the current message...
+         // and go for session.promptDelivery only after the message was finished.
+         // We also need to use an executor, or else the LargeMessageSend will be performed at the caller's thread
+         executor.execute(new Runnable(){
+
+            public void run()
+            {
+               if (largeMessageSender.sendLargeMessage())
+               {
+                  // prompt Delivery only if chunk was finished
+                  session.promptDelivery(messageQueue);
+               }
+            }
+         });
       }
       else
       {
@@ -593,7 +607,7 @@
       // TODO: Should we block until the replication is done?
       channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
 
-      // SendLargeMessage has to be done on the same thread used on the QueueImpl, or we would have problems.
+      // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with ordering and flow control
       largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
       largeMessageSender.sendLargeMessage();
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-13 22:56:14 UTC (rev 5626)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-13 23:37:49 UTC (rev 5627)
@@ -1309,7 +1309,8 @@
                                                           queueSettingsRepository,
                                                           postOffice,
                                                           channel,
-                                                          preAcknowledge);
+                                                          preAcknowledge,
+                                                          this.executor);
 
          consumers.put(consumer.getID(), consumer);
 




More information about the jboss-cvs-commits mailing list