[jboss-cvs] JBoss Messaging SVN: r6625 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Apr 29 17:18:47 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-29 17:18:47 -0400 (Wed, 29 Apr 2009)
New Revision: 6625

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
Log:
SlowConsumer & ClientConsumerWindowSize fix

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-29 21:18:47 UTC (rev 6625)
@@ -92,6 +92,8 @@
    private volatile boolean closed;
 
    private volatile int creditsToSend;
+   
+   private volatile boolean slowConsumerInitialCreditSent = false;
 
    private volatile Exception lastException;
 
@@ -154,10 +156,10 @@
          throw new MessagingException(MessagingException.ILLEGAL_STATE,
                                       "Cannot call receive(...) - a MessageHandler is set");
       }
-
-      if (clientWindowSize == 0 && buffer.isEmpty())
+      
+      if (clientWindowSize == 0)
       {
-         sendCredits(1);
+         startSlowConsumer();
       }
 
       receiverThread = Thread.currentThread();
@@ -221,6 +223,11 @@
                   
                   session.expire(id, m.getMessageID());
 
+                  if (clientWindowSize == 0)
+                  {
+                     startSlowConsumer();
+                  }                 
+
                   if (toWait > 0)
                   {
                      continue;
@@ -283,7 +290,7 @@
 
       if (handler != theHandler && clientWindowSize == 0)
       {
-         sendCredits(1);
+         startSlowConsumer();
       }
 
       handler = theHandler;
@@ -409,7 +416,7 @@
       }
       
       // Flow control for the first packet, we will have others
-      flowControl(packet.getPacketSize(), true);
+      flowControl(packet.getPacketSize(), false);
 
       currentChunkMessage = new ClientMessageImpl();
       
@@ -482,8 +489,10 @@
     * flow control is synchornized because of LargeMessage and streaming.
     * LargeMessageBuffer will call flowcontrol here, while other handleMessage will also be calling flowControl.
     * So, this operation needs to be atomic.
+    * 
+    * @parameter discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
     * */
-   public void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException
+   public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws MessagingException
    {
       if (clientWindowSize >= 0)
       {
@@ -492,37 +501,35 @@
          if (creditsToSend >= clientWindowSize)
          {
             
-            if (isLargeMessage)
+            if (clientWindowSize == 0 && discountSlowConsumer)
             {
-               // Flowcontrol on largeMessages continuations needs to be done in a separate thread or failover would
-               // block
-               final int credits = creditsToSend;
+               if (trace)
+               {
+                  log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+               }
+               
+               slowConsumerInitialCreditSent = false;
 
+               // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
+               // always buffering one after received the first message
+               final int credits = creditsToSend - 1;
+
                creditsToSend = 0;
-
+               
                sendCredits(credits);
             }
             else
             {
-               if (clientWindowSize == 0)
+               if (trace)
                {
-                  if (trace)
-                  {
-                     log.trace("Sending full credits - 1 for slow consumer");
-                  }
-                  // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
-                  // always buffering one after received the first message
-                  sendCredits(creditsToSend - 1);
+                  log.trace("Sending " + messageBytes + " from flow-control");
                }
-               else
-               {
-                  if (trace)
-                  {
-                     log.trace("Sending full credits");
-                  }
-                  sendCredits(creditsToSend);
-               }
+             
+               final int credits = creditsToSend;
+
                creditsToSend = 0;
+
+               sendCredits(credits);
             }
          }
       }
@@ -540,6 +547,22 @@
    // Private
    // ---------------------------------------------------------------------------------------
 
+   /** 
+    * Sending a initial credit for slow consumers
+    * */
+   private void startSlowConsumer()
+   {
+      if (!slowConsumerInitialCreditSent)
+      {
+         if (trace)
+         {
+            log.trace("Sending 1 credit to start delivering of one message to slow consumer");
+         }
+         slowConsumerInitialCreditSent = true;
+         sendCredits(1);
+      }
+   }
+
    private void requeueExecutors()
    {
       for (int i = 0; i < buffer.size(); i++)
@@ -564,7 +587,7 @@
    {
       if (trace)
       {
-         log.trace("Sending " + credits + " back");
+         log.trace("Sending " + credits + " credits back", new Exception ("trace"));
       }
       channel.send(new SessionConsumerFlowCreditMessage(id, credits));
    }
@@ -664,11 +687,7 @@
             // If slow consumer, we need to send 1 credit to make sure we get another message
             if (clientWindowSize == 0)
             {
-               if (trace)
-               {
-                  log.trace("Sending 1 credit back after consuming message on slow consumer (no-buffering)");
-               }
-               sendCredits(1);
+               startSlowConsumer();
             }
          }
       }
@@ -683,7 +702,7 @@
       // Chunk messages will execute the flow control while receiving the chunks
       if (message.getFlowControlSize() != 0)
       {
-         flowControl(message.getFlowControlSize(), false);
+         flowControl(message.getFlowControlSize(), true);
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2009-04-29 21:18:47 UTC (rev 6625)
@@ -45,7 +45,7 @@
    
    void handleLargeMessageContinuation(SessionReceiveContinuationMessage continuation) throws Exception;
    
-   void flowControl(final int messageBytes, final boolean isLargeMessage) throws MessagingException;
+   void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws MessagingException;
 
    void clear();
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java	2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java	2009-04-29 21:18:47 UTC (rev 6625)
@@ -123,38 +123,56 @@
     * Add a buff to the List, or save it to the OutputStream if set
     * @param packet
     */
-   public synchronized void addPacket(final SessionReceiveContinuationMessage packet)
+   public void addPacket(final SessionReceiveContinuationMessage packet)
    {
-      if (outStream != null)
+      int flowControlCredit = 0;
+      boolean continues = false;
+      
+      synchronized (this)
       {
-         try
+         if (outStream != null)
          {
-            if (!packet.isContinues())
+            try
             {
-               streamEnded = true;
-            }
+               if (!packet.isContinues())
+               {
+                  streamEnded = true;
+               }
 
-            outStream.write(packet.getBody());
+               outStream.write(packet.getBody());
 
-            consumerInternal.flowControl(packet.getPacketSize(), true);
+               flowControlCredit = packet.getPacketSize();
+               continues = packet.isContinues();
 
-            notifyAll();
+               notifyAll();
 
-            if (streamEnded)
+               if (streamEnded)
+               {
+                  outStream.close();
+               }
+            }
+            catch (Exception e)
             {
-               outStream.close();
+               handledException = e;
             }
          }
+         else
+         {
+            packets.offer(packet);
+         }
+      }
+
+      if (flowControlCredit != 0)
+      {
+         try
+         {
+            consumerInternal.flowControl(flowControlCredit, !continues);
+         }
          catch (Exception e)
          {
             handledException = e;
-
          }
       }
-      else
-      {
-         packets.offer(packet);
-      }
    }
 
    public synchronized void close()
@@ -164,25 +182,35 @@
       notifyAll();
    }
 
-   public synchronized void setOutputStream(final OutputStream output) throws MessagingException
+   public void setOutputStream(final OutputStream output) throws MessagingException
    {
-      if (currentPacket != null)
+
+      int totalFlowControl = 0;
+      boolean continues = false;
+
+      synchronized (this)
       {
-         sendPacketToOutput(output, currentPacket);
-         currentPacket = null;
-      }
-      while (true)
-      {
-         SessionReceiveContinuationMessage packet = packets.poll();
-         if (packet == null)
+         if (currentPacket != null)
          {
-            break;
+            sendPacketToOutput(output, currentPacket);
+            currentPacket = null;
          }
-         consumerInternal.flowControl(packet.getPacketSize(), true);
-         sendPacketToOutput(output, packet);
+         while (true)
+         {
+            SessionReceiveContinuationMessage packet = packets.poll();
+            if (packet == null)
+            {
+               break;
+            }
+            totalFlowControl += packet.getPacketSize();
+            continues = packet.isContinues();
+            sendPacketToOutput(output, packet);
+         }
+
+         outStream = output;
       }
 
-      outStream = output;
+      consumerInternal.flowControl(totalFlowControl, !continues);
    }
 
    public synchronized void saveBuffer(final OutputStream output) throws MessagingException
@@ -1134,7 +1162,7 @@
             throw new IndexOutOfBoundsException();
          }
 
-         consumerInternal.flowControl(currentPacket.getPacketSize(), true);
+         consumerInternal.flowControl(currentPacket.getPacketSize(), !currentPacket.isContinues());
 
          packetPosition += sizeToAdd;
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-04-29 21:18:47 UTC (rev 6625)
@@ -327,6 +327,11 @@
       {
          int previous = availableCredits.getAndAdd(credits);
 
+         if (trace)
+         {
+            log.trace("Received " + credits + " credits, previous value = " + previous + " currentValue = " + availableCredits.get());
+         }
+         
          if (previous <= 0 && previous + credits > 0)
          {
             promptDelivery();
@@ -485,6 +490,10 @@
 
    private void promptDelivery()
    {
+      if (trace)
+      {
+         log.trace("Starting prompt delivery");
+      }
       lock.lock();
       try
       {
@@ -932,6 +941,10 @@
             // Since we are not sending anything to the client during this calculation, this is unlikely to happen
             if (availableCredits.compareAndSet(currentCredit, currentCredit - precalculatedCredits))
             {
+               if (trace)
+               {
+                  log.trace("Taking " + precalculatedCredits + " credits out on preCalculateFlowControl (largeMessage)");
+               }
                return precalculatedCredits;
             }
          }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-04-29 15:45:32 UTC (rev 6624)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java	2009-04-29 21:18:47 UTC (rev 6625)
@@ -240,13 +240,41 @@
 
          ClientConsumerInternal consNeverUsed = (ClientConsumerInternal)sessionB.createConsumer(ADDRESS);
 
+         ClientProducer prod = session.createProducer(ADDRESS);
+         
+         // This will force a credit to be sent, but if the message wasn't received we need to take out that credit from the server
+         // or the client will be buffering messages
+         assertNull(consNeverUsed.receive(1));
+         
+         ClientMessage msg = createTextMessage(session, "This one will expire");
+         if (largeMessages)
+         {
+            msg.getBody().writeBytes(new byte[600]);
+         }
+         
+         msg.setExpiration(System.currentTimeMillis() + 100);
+         prod.send(msg);
+         
+         msg = createTextMessage(session, "First-on-non-buffered");
+
+         prod.send(msg);
+         
+         Thread.sleep(110);
+         
+         // It will be able to receive another message, but it shouldn't send a credit again, as the credit was already sent
+         msg = consNeverUsed.receive(TIMEOUT * 1000);
+         assertNotNull(msg);
+         assertEquals("First-on-non-buffered", getTextMessage(msg));
+         msg.acknowledge();
+         
+
          ClientConsumer cons1 = session.createConsumer(ADDRESS);
 
-         ClientProducer prod = session.createProducer(ADDRESS);
 
+         
          for (int i = 0; i < numberOfMessages; i++)
          {
-            ClientMessage msg = createTextMessage(session, "Msg" + i);
+            msg = createTextMessage(session, "Msg" + i);
 
             if (largeMessages)
             {
@@ -258,7 +286,7 @@
 
          for (int i = 0; i < numberOfMessages; i++)
          {
-            ClientMessage msg = cons1.receive(1000);
+            msg = cons1.receive(1000);
             assertNotNull("expected message at i = " + i, msg);
             assertEquals("Msg" + i, getTextMessage(msg));
             msg.acknowledge();




More information about the jboss-cvs-commits mailing list