[jboss-cvs] JBoss Messaging SVN: r5628 - branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 13 19:57:44 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-13 19:57:44 -0500 (Tue, 13 Jan 2009)
New Revision: 5628

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
tweaks

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-13 23:37:49 UTC (rev 5627)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-14 00:57:44 UTC (rev 5628)
@@ -26,7 +26,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -94,8 +93,6 @@
    private final int minLargeMessageSize;
 
    private final ServerSession session;
-   
-   private final Executor executor;
 
    private final Lock lock = new ReentrantLock();
 
@@ -136,8 +133,7 @@
                              final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                              final PostOffice postOffice,
                              final Channel channel,
-                             final boolean preAcknowledge,
-                             final Executor executor)
+                             final boolean preAcknowledge)
    {
       this.id = id;
 
@@ -146,8 +142,6 @@
       this.filter = filter;
 
       this.session = session;
-      
-      this.executor = executor;
 
       this.started = browseOnly || started;
 
@@ -489,26 +483,25 @@
 
    private void promptDelivery()
    {
-      if (largeMessageSender != null)
+      lock.lock();
+      try
       {
-         // If there is a LargeMessageSender, it needs to first resume & finalize sending the current message...
-         // and go for session.promptDelivery only after the message was finished.
-         // We also need to use an executor, or else the LargeMessageSend will be performed at the caller's thread
-         executor.execute(new Runnable(){
-
-            public void run()
+         if (largeMessageSender != null)
+         {
+            if (largeMessageSender.sendLargeMessage())
             {
-               if (largeMessageSender.sendLargeMessage())
-               {
-                  // prompt Delivery only if chunk was finished
-                  session.promptDelivery(messageQueue);
-               }
+               // prompt Delivery only if chunk was finished
+               session.promptDelivery(messageQueue);
             }
-         });
+         }
+         else
+         {
+            session.promptDelivery(messageQueue);
+         }
       }
-      else
+      finally
       {
-         session.promptDelivery(messageQueue);
+         lock.unlock();
       }
    }
 
@@ -555,6 +548,8 @@
          // This has to be checked inside the lock as the set to null is done inside the lock
          if (largeMessageSender != null)
          {
+            log.info("doHandle: LargeMessageSender != null," + messageQueue.isBackup() +
+                     " can't send another message while send is pending", new Exception("trace"));
             if (trace)
             {
                trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
@@ -607,7 +602,8 @@
       // TODO: Should we block until the replication is done?
       channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
 
-      // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with ordering and flow control
+      // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with
+      // ordering and flow control
       largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
       largeMessageSender.sendLargeMessage();
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-13 23:37:49 UTC (rev 5627)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-14 00:57:44 UTC (rev 5628)
@@ -990,19 +990,20 @@
    {
       DelayedResult result = channel.replicatePacket(packet);
 
-      if (result == null)
+      try
       {
-         try
-         {
-            consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
-         }
-         catch (Exception e)
-         {
-            log.error("Failed to receive credits", e);
-         }
+         // Note we don't wait for response before handling this
 
-         channel.confirm(packet);
+         consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to receive credits", e);
+      }
 
+      if (result == null)
+      {
+         channel.confirm(packet);
       }
       else
       {
@@ -1010,16 +1011,7 @@
          {
             public void run()
             {
-               try
-               {
-                  consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
-               }
-               catch (Exception e)
-               {
-                  log.error("Failed to receive credits", e);
-               }
                channel.confirm(packet);
-
             }
          });
       }
@@ -1309,8 +1301,7 @@
                                                           queueSettingsRepository,
                                                           postOffice,
                                                           channel,
-                                                          preAcknowledge,
-                                                          this.executor);
+                                                          preAcknowledge);
 
          consumers.put(consumer.getID(), consumer);
 




More information about the jboss-cvs-commits mailing list