[jboss-cvs] JBoss Messaging SVN: r6591 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 27 23:04:35 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-27 23:04:34 -0400 (Mon, 27 Apr 2009)
New Revision: 6591

Added:
   trunk/tests/src/org/jboss/messaging/tests/unit/ra/
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1573 - Making standard messages to also use the packet-size

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-28 03:02:03 UTC (rev 6590)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-28 03:04:34 UTC (rev 6591)
@@ -377,6 +377,11 @@
       ClientMessageInternal messageToHandle = message;
 
       messageToHandle.onReceipt(this);
+      
+      if (trace)
+      {
+         log.trace("Adding message " + message + " into buffer");
+      }
 
       // Add it to the buffer
       buffer.addLast(messageToHandle, messageToHandle.getPriority());
@@ -501,12 +506,20 @@
             {
                if (clientWindowSize == 0)
                {
+                  if (trace)
+                  {
+                     log.trace("Sending full credits - 1 for slow consumer");
+                  }
                   // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
                   // always buffering one after received the first message
                   sendCredits(creditsToSend - 1);
                }
                else
                {
+                  if (trace)
+                  {
+                     log.trace("Sending full credits");
+                  }
                   sendCredits(creditsToSend);
                }
                creditsToSend = 0;
@@ -537,6 +550,10 @@
 
    private void queueExecutor()
    {
+      if (trace)
+      {
+         log.trace("Adding Runner on Executor for delivery");
+      }
       sessionExecutor.execute(runner);
    }
 
@@ -545,6 +562,10 @@
     */
    private void sendCredits(final int credits)
    {
+      if (trace)
+      {
+         log.trace("Sending " + credits + " back");
+      }
       channel.send(new SessionConsumerFlowCreditMessage(id, credits));
    }
 
@@ -619,8 +640,17 @@
             {
                onMessageThread = Thread.currentThread();
 
+               if (trace)
+               {
+                  log.trace("Calling handler.onMessage");
+               }
                theHandler.onMessage(message);
                
+               if (trace)
+               {
+                  log.trace("Handler.onMessage done");
+               }
+               
                if (message.isLargeMessage())
                {
                   message.discardLargeBody();
@@ -634,6 +664,10 @@
             // If slow consumer, we need to send 1 credit to make sure we get another message
             if (clientWindowSize == 0)
             {
+               if (trace)
+               {
+                  log.trace("Sending 1 credit back after consuming message on slow consumer (no-buffering)");
+               }
                sendCredits(1);
             }
          }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-04-28 03:02:03 UTC (rev 6590)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-04-28 03:04:34 UTC (rev 6591)
@@ -636,7 +636,7 @@
       {
          ClientMessageInternal clMessage = message.getClientMessage();
 
-         clMessage.setFlowControlSize(clMessage.getEncodeSize());
+         clMessage.setFlowControlSize(message.getPacketSize());
 
          consumer.handleMessage(message.getClientMessage());
       }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-04-28 03:02:03 UTC (rev 6590)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-04-28 03:04:34 UTC (rev 6591)
@@ -77,8 +77,7 @@
 
    // Static ---------------------------------------------------------------------------------------
 
-   // private static final boolean trace = log.isTraceEnabled();
-   private static final boolean trace = false;
+   private static final boolean trace = log.isTraceEnabled();
 
    private static void trace(final String message)
    {
@@ -668,13 +667,17 @@
     */
    private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
    {
+      final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
+
       if (availableCredits != null)
       {
-         availableCredits.addAndGet(-message.getEncodeSize());
+         availableCredits.addAndGet(-packet.getRequiredBufferSize());
+         if (trace)
+         {
+            log.trace("Taking " + packet.getRequiredBufferSize() + " out of flow control");
+         }
       }
 
-      final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
-
       if (replicatingChannel == null)
       {
          // Not replicated - just send now




More information about the jboss-cvs-commits mailing list