[jboss-cvs] JBoss Messaging SVN: r5640 - in branches/Branch_Failover_Page/src/main/org/jboss/messaging/core: server/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 14 17:38:58 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-14 17:38:58 -0500 (Wed, 14 Jan 2009)
New Revision: 5640

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
Tweaks & comments

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-01-14 15:05:45 UTC (rev 5639)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-01-14 22:38:58 UTC (rev 5640)
@@ -476,24 +476,26 @@
 
          if (creditsToSend >= clientWindowSize)
          {
-            final int credits = creditsToSend;
 
-            creditsToSend = 0;
-
             if (useExecutor)
             {
-               sessionExecutor.execute(new Runnable(){
-   
+               final int credits = creditsToSend;
+
+               creditsToSend = 0;
+               sessionExecutor.execute(new Runnable()
+               {
+
                   public void run()
                   {
                      channel.send(new SessionConsumerFlowCreditMessage(id, credits));
                   }
-                  
+
                });
             }
             else
             {
-               channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+               channel.send(new SessionConsumerFlowCreditMessage(id, creditsToSend));
+               creditsToSend = 0;
             }
          }
       }
@@ -624,7 +626,7 @@
          {
             channel.sendBlocking(new SessionConsumerCloseMessage(id));
          }
-         
+
          clearBuffer();
       }
       finally
@@ -632,16 +634,16 @@
          session.removeConsumer(this);
       }
    }
-   
+
    private void clearBuffer()
    {
       if (isFileConsumer())
       {
-         for (ClientMessage message: buffer)
+         for (ClientMessage message : buffer)
          {
             if (message instanceof ClientFileMessage)
             {
-               ((ClientFileMessage) message).getFile().delete();
+               ((ClientFileMessage)message).getFile().delete();
             }
          }
       }
@@ -691,7 +693,7 @@
                                                   "-" +
                                                   getID() +
                                                   ".jbm"));
-         
+
          cloneMessage.setFlowControlSize(message.getFlowControlSize());
 
          addBytesBody(cloneMessage, message.getBody().array());
@@ -712,7 +714,7 @@
          if (!directory.exists())
          {
             boolean ok = directory.mkdirs();
-            
+
             if (!ok)
             {
                throw new IOException("Failed to create directory " + directory.getCanonicalPath());

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-14 15:05:45 UTC (rev 5639)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-14 22:38:58 UTC (rev 5640)
@@ -103,7 +103,7 @@
 
    private boolean started;
 
-   private volatile LargeMessageSender largeMessageSender = null;
+   private volatile LargeMessageDeliverer largeMessageDeliverer = null;
 
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
@@ -489,7 +489,7 @@
 
    private void promptDelivery()
    {
-      if (largeMessageSender != null)
+      if (largeMessageDeliverer != null)
       {
          resumeLargeMessage();
       }
@@ -506,15 +506,15 @@
    {
       if (messageQueue.isBackup())
       {
-         // We are supposed to finish largeMessageSender, or use all the possible credits before we return this method.
+         // We are supposed to finish largeMessageDeliverer, or use all the possible credits before we return this method.
          // If we play the commands on a different order than how they were generated on the live node, we will 
          // eventually still be running this largeMessage before the next message come, what would reject messages
          // from the cluster
-         largeMessageSender.resumeLargeMessageRunnable.run();
+         largeMessageDeliverer.deliver();
       }
       else
       {
-         executor.execute(largeMessageSender.resumeLargeMessageRunnable);
+         executor.execute(largeMessageDeliverer.resumeLargeMessageRunnable);
       }
    }
 
@@ -559,11 +559,11 @@
 
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
-         if (largeMessageSender != null)
+         if (largeMessageDeliverer != null)
          {
             if (trace)
             {
-               trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
+               trace("doHandle: LargeMessageDeliverer != null, can't send another message while send is pending");
             }
             return HandleStatus.BUSY;
          }
@@ -593,11 +593,11 @@
 
          if (message.isLargeMessage())
          {
-            sendLargeMessage(ref, message);
+            deliverLargeMessage(ref, message);
          }
          else
          {
-            sendStandardMessage(ref, message);
+            deliverStandardMessage(ref, message);
          }
 
          return HandleStatus.HANDLED;
@@ -608,23 +608,45 @@
       }
    }
 
-   private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
+   private void deliverLargeMessage(final MessageReference ref, final ServerMessage message)
    {
-      // TODO: Should we block until the replication is done?
+      largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
+
       channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
 
-      // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with
-      // ordering and flow control
-      largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
-      largeMessageSender.sendLargeMessage();
+      // TODO: Should we block until the replication is done?
+//      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
+//      if (result != null)
+//      {
+//         final CountDownLatch latch = new CountDownLatch(1);
+//         result.setResultRunner(new Runnable()
+//         {
+//            public void run()
+//            {
+//               latch.countDown();
+//            }
+//            
+//         });
+//         try
+//         {
+//            latch.await();
+//         }
+//         catch (InterruptedException ignored)
+//         {
+//         }
+//      }
 
+      // deliverLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with flow control credits
+      // credits would arrive while deliver still being done, what would cause interruption on the flowControl
+      largeMessageDeliverer.deliver();
+   
    }
 
    /**
     * @param ref
     * @param message
     */
-   private void sendStandardMessage(final MessageReference ref, final ServerMessage message)
+   private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
    {
       if (availableCredits != null)
       {
@@ -661,7 +683,7 @@
 
    /** Internal encapsulation of the logic on sending LargeMessages.
     *  This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
-   private class LargeMessageSender
+   private class LargeMessageDeliverer
    {
       private final long sizePendingLargeMessage;
 
@@ -679,7 +701,7 @@
       {
          public void run()
          {
-            if (sendLargeMessage())
+            if (deliver())
             {
                // prompt Delivery only if chunk was finished
                session.promptDelivery(messageQueue);
@@ -687,7 +709,7 @@
          }
       };
 
-      public LargeMessageSender(final LargeServerMessage message, final MessageReference ref)
+      public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref)
       {
          pendingLargeMessage = message;
 
@@ -696,7 +718,7 @@
          this.ref = ref;
       }
 
-      public boolean sendLargeMessage()
+      public boolean deliver()
       {
          lock.lock();
 
@@ -716,7 +738,7 @@
             {
                if (trace)
                {
-                  trace("sendLargeMessage:: sending initialMessage, backup = " + messageQueue.isBackup());
+                  trace("deliverLargeMessage:: sending initialMessage, backup = " + messageQueue.isBackup());
                }
                sentFirstMessage = true;
 
@@ -736,7 +758,7 @@
                   availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
                   if (trace)
                   {
-                     trace("sendLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
+                     trace("deliverLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
                            " credits, current = " +
                            availableCredits +
                            " isBackup = " +
@@ -749,7 +771,7 @@
             {
                if (trace)
                {
-                  trace("sendLargeMessage: Summarizing sendLargeMessage, currentPosition = " + positionPendingLargeMessage);
+                  trace("deliverLargeMessage: Summarizing deliverLargeMessage, currentPosition = " + positionPendingLargeMessage);
                }
             }
 
@@ -759,7 +781,7 @@
                {
                   if (trace)
                   {
-                     trace("sendLargeMessage: Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
+                     trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
                   }
                   return false;
                }
@@ -775,7 +797,7 @@
 
                if (trace)
                {
-                  trace("sendLargeMessage: Sending " + chunk.getRequiredBufferSize() +
+                  trace("deliverLargeMessage: Sending " + chunk.getRequiredBufferSize() +
                         " availableCredits now is " +
                         availableCredits +
                         " isBackup = " +
@@ -789,12 +811,12 @@
 
             if (trace)
             {
-               trace("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+               trace("Finished deliverLargeMessage isBackup = " + messageQueue.isBackup());
             }
 
             pendingLargeMessage.releaseResources();
 
-            largeMessageSender = null;
+            largeMessageDeliverer = null;
 
             return true;
          }




More information about the jboss-cvs-commits mailing list