[hornetq-commits] JBoss hornetq SVN: r11541 - in trunk/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 14 12:54:57 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-10-14 12:54:56 -0400 (Fri, 14 Oct 2011)
New Revision: 11541

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
JBPAPP-7389 - flow control on large messages

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-10-14 16:54:56 UTC (rev 11541)
@@ -572,7 +572,7 @@
          largeMessageCache.deleteOnExit();
       }
 
-      currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+      currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 5, largeMessageCache);
 
       if (currentChunkMessage.isCompressed())
       {
@@ -594,7 +594,18 @@
       {
          return;
       }
-      currentLargeMessageController.addPacket(chunk);
+      if (currentLargeMessageController == null)
+      {
+         if (log.isTraceEnabled())
+         {
+            log.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+         }
+         flowControl(chunk.getPacketSize(), false);
+      }
+      else
+      {
+         currentLargeMessageController.addPacket(chunk);
+      }
    }
 
    public void clear(boolean waitForOnMessage) throws HornetQException
@@ -607,12 +618,39 @@
 
          while (iter.hasNext())
          {
-            ClientMessageInternal message = iter.next();
+            try
+            {
+               ClientMessageInternal message = iter.next();
+               
+               if (message.isLargeMessage())
+               {
+                  ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
+                  largeMessage.getLargeMessageController().cancel();
+               }
 
-            flowControlBeforeConsumption(message);
+               flowControlBeforeConsumption(message);
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+            }
          }
 
          clearBuffer();
+         
+         try
+         {
+            if (currentLargeMessageController != null)
+            {
+               currentLargeMessageController.cancel();
+               currentLargeMessageController = null;
+            }
+         }
+         catch (Throwable e)
+         {
+            // nothing that could be done here
+            log.warn(e.getMessage(), e);
+         }
       }
 
       // Need to send credits for the messages in the buffer
@@ -678,6 +716,11 @@
       if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;
+         
+         if (log.isTraceEnabled())
+         {
+            log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
+         }
 
          if (creditsToSend >= clientWindowSize)
          {
@@ -685,7 +728,7 @@
             {
                if (ClientConsumerImpl.isTrace)
                {
-                  ClientConsumerImpl.log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+                  ClientConsumerImpl.log.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
                }
 
                // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java	2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/LargeMessageControllerImpl.java	2011-10-14 16:54:56 UTC (rev 11541)
@@ -31,6 +31,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
          {
             checkForPacket(totalSize - 1);
          }
-         catch (Exception ignored)
+         catch (Throwable ignored)
          {
          }
       }
@@ -227,6 +228,24 @@
 
    public synchronized void cancel()
    {
+      
+      int totalSize = 0;
+      Packet polledPacket = null;
+      while ((polledPacket = packets.poll()) != null)
+      {
+         totalSize += polledPacket.getPacketSize();
+      }
+
+      try
+      {
+         consumerInternal.flowControl(totalSize, false);
+      }
+      catch (Exception ignored)
+      {
+         // what else can we do here?
+         log.warn(ignored.getMessage(), ignored);
+      }
+       
       packets.offer(new SessionReceiveContinuationMessage());
       streamEnded = true;
       streamClosed = true;
@@ -279,6 +298,11 @@
 
    public synchronized void saveBuffer(final OutputStream output) throws HornetQException
    {
+      if (streamClosed)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE,
+                                    "The large message lost connection with its session, either because of a rollback or a closed session");
+      }
       setOutputStream(output);
       waitCompletion(0);
    }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java	2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java	2011-10-14 16:54:56 UTC (rev 11541)
@@ -64,7 +64,14 @@
     */
    public byte[] getBody()
    {
-      return body;
+      if (size <= 0)
+      {
+         return new byte[0];
+      }
+      else
+      {
+         return body;
+      }
    }
 
    /**

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java	2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java	2011-10-14 16:54:56 UTC (rev 11541)
@@ -77,6 +77,21 @@
       super.encodeRest(buffer);
       buffer.writeLong(consumerID);
    }
+   @Override
+   public int getPacketSize()
+   {
+      if (size == -1)
+      {
+         // This packet was created by the LargeMessageController
+         return 0;
+      }
+      else
+      {
+         return size;
+      }
+   }
+   
+   
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-10-14 16:54:26 UTC (rev 11540)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-10-14 16:54:56 UTC (rev 11541)
@@ -61,7 +61,7 @@
    // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-   
+
    private static boolean isTrace = log.isTraceEnabled();
 
    // Static ---------------------------------------------------------------------------------------
@@ -85,7 +85,7 @@
    private boolean started;
 
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-   
+
    public String debug()
    {
       return toString() + "::Delivering " + this.deliveringRefs.size();
@@ -115,7 +115,7 @@
    private final Binding binding;
 
    private boolean transferring = false;
-   
+
    /* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
     * This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
     * write queue when the TCP buffer is full, e.g. the client is slow or has died.    
@@ -163,11 +163,11 @@
       minLargeMessageSize = session.getMinLargeMessageSize();
 
       this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-      
+
       this.callback.addReadyListener(this);
 
       this.creationTime = System.currentTimeMillis();
-      
+
       if (browseOnly)
       {
          browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -185,7 +185,7 @@
    {
       return id;
    }
-   
+
    public boolean isBrowseOnly()
    {
       return browseOnly;
@@ -195,12 +195,12 @@
    {
       return creationTime;
    }
-   
+
    public String getConnectionID()
    {
       return this.session.getConnectionID().toString();
    }
-   
+
    public String getSessionID()
    {
       return this.session.getName();
@@ -210,20 +210,23 @@
    {
       if (availableCredits != null && availableCredits.get() <= 0)
       {
-         if (log.isDebugEnabled() )
+         if (log.isDebugEnabled())
          {
-            log.debug(this  + " is busy for the lack of credits!!!");
+            log.debug(this + " is busy for the lack of credits. Current credits = " +
+                      availableCredits +
+                      " Can't receive reference " +
+                      ref);
          }
-         
+
          return HandleStatus.BUSY;
       }
-      
-// TODO - https://jira.jboss.org/browse/HORNETQ-533      
-//      if (!writeReady.get())
-//      {
-//         return HandleStatus.BUSY;
-//      }
-      
+
+      // TODO - https://jira.jboss.org/browse/HORNETQ-533
+      // if (!writeReady.get())
+      // {
+      // return HandleStatus.BUSY;
+      // }
+
       synchronized (lock)
       {
          // If the consumer is stopped then we don't accept the message, it
@@ -233,11 +236,18 @@
          {
             return HandleStatus.BUSY;
          }
-         
+
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
          if (largeMessageDeliverer != null)
          {
+            if (log.isDebugEnabled())
+            {
+               log.debug(this + " is busy delivering large message " +
+                         largeMessageDeliverer +
+                         ", can't deliver reference " +
+                         ref);
+            }
             return HandleStatus.BUSY;
          }
 
@@ -268,7 +278,9 @@
             // the updateDeliveryCount would still be updated after c
             if (strictUpdateDeliveryCount && !ref.isPaged())
             {
-               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue() && !ref.isPaged())
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+                   !ref.getQueue().isInternalQueue() &&
+                   !ref.isPaged())
                {
                   storageManager.updateDeliveryCount(ref);
                }
@@ -309,7 +321,7 @@
    public void close(final boolean failed) throws Exception
    {
       callback.removeReadyListener(this);
-      
+
       setStarted(false);
 
       if (largeMessageDeliverer != null)
@@ -355,8 +367,8 @@
 
          props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
 
-         props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
-                                                                                        : filter.getFilterString());
+         props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+                                       filter == null ? null : filter.getFilterString());
 
          props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
@@ -406,10 +418,28 @@
       }
    }
 
-   public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+   public LinkedList<MessageReference> cancelRefs(final boolean failed,
+                                                  final boolean lastConsumedAsDelivered,
+                                                  final Transaction tx) throws Exception
    {
       boolean performACK = lastConsumedAsDelivered;
 
+      try
+      {
+         if (largeMessageDeliverer != null)
+         {
+            largeMessageDeliverer.finish();
+         }
+      }
+      catch (Throwable e)
+      {
+         log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
+      }
+      finally
+      {
+         largeMessageDeliverer = null;
+      }
+
       LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
 
       if (!deliveringRefs.isEmpty())
@@ -430,8 +460,9 @@
             {
                if (!failed)
                {
-                  //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
-                  //before failure
+                  // We don't decrement delivery count if the client failed, since there's a possibility that refs
+                  // were actually delivered but we just didn't get any acks for them
+                  // before failure
                   ref.decrementDeliveryCount();
                }
 
@@ -492,18 +523,23 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {      
+   {
       if (credits == -1)
       {
+         if (log.isDebugEnabled())
+         {
+            log.debug(this + ":: FlowControl::Received disable flow control message");
+         }
          // No flow control
          availableCredits = null;
-         
-         //There may be messages already in the queue
+
+         // There may be messages already in the queue
          promptDelivery();
       }
       else if (credits == 0)
       {
-         //reset, used on slow consumers
+         // reset, used on slow consumers
+         log.debug(this + ":: FlowControl::Received reset flow control message");
          availableCredits.set(0);
       }
       else
@@ -512,16 +548,17 @@
 
          if (log.isDebugEnabled())
          {
-            log.debug(this + "::Received " + credits +
-                                     " credits, previous value = " +
-                                     previous +
-                                     " currentValue = " +
-                                     availableCredits.get());
+            log.debug(this + "::FlowControl::Received " +
+                      credits +
+                      " credits, previous value = " +
+                      previous +
+                      " currentValue = " +
+                      availableCredits.get());
          }
 
          if (previous <= 0 && previous + credits > 0)
          {
-            if (log.isTraceEnabled() )
+            if (log.isTraceEnabled())
             {
                log.trace(this + "::calling promptDelivery from receiving credits");
             }
@@ -541,7 +578,7 @@
       {
          return;
       }
-      
+
       // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
       // acknowledged
 
@@ -573,21 +610,21 @@
       }
       while (ref.getMessage().getMessageID() != messageID);
    }
-   
+
    public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
          return;
       }
-      
+
       MessageReference ref = removeReferenceByID(messageID);
-      
+
       if (ref == null)
       {
          throw new IllegalStateException("Cannot find ref to ack " + messageID);
       }
-      
+
       if (autoCommitAcks)
       {
          ref.getQueue().acknowledge(ref);
@@ -627,13 +664,13 @@
 
       return ref;
    }
-      
+
    public void readyForWriting(final boolean ready)
    {
       if (ready)
       {
          writeReady.set(true);
-         
+
          promptDelivery();
       }
       else
@@ -701,10 +738,17 @@
       if (availableCredits != null)
       {
          availableCredits.addAndGet(-packetSize);
+
+         if (log.isTraceEnabled())
+         {
+            log.trace(this + "::FlowControl::delivery standard taking " +
+                      packetSize +
+                      " from credits, available now is " +
+                      availableCredits);
+         }
       }
    }
 
-
    // Inner classes
    // ------------------------------------------------------------------------
 
@@ -766,6 +810,12 @@
 
             if (availableCredits != null && availableCredits.get() <= 0)
             {
+               if (log.isTraceEnabled())
+               {
+                  log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+                            availableCredits);
+               }
+
                return false;
             }
 
@@ -774,7 +824,7 @@
                context = largeMessage.getBodyEncoder();
 
                sizePendingLargeMessage = context.getLargeBodySize();
-               
+
                context.open();
 
                sentInitialPacket = true;
@@ -787,6 +837,15 @@
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-packetSize);
+
+                  if (log.isTraceEnabled())
+                  {
+                     log.trace(this + "::FlowControl::" +
+                               " deliver initialpackage with " +
+                               packetSize +
+                               " delivered, available now = " +
+                               availableCredits);
+                  }
                }
 
                // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -802,7 +861,8 @@
                {
                   if (ServerConsumerImpl.isTrace)
                   {
-                     log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+                     log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+                               availableCredits);
                   }
 
                   return false;
@@ -825,16 +885,17 @@
 
                int chunkLen = body.length;
 
-               if (ServerConsumerImpl.isTrace)
-               {
-                  log.trace("deliverLargeMessage: Sending " + packetSize +
-                                           " availableCredits now is " +
-                                           availableCredits);
-               }
-
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-packetSize);
+
+                  if (log.isTraceEnabled())
+                  {
+                     log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+                               packetSize +
+                               " available now=" +
+                               availableCredits);
+                  }
                }
 
                positionPendingLargeMessage += chunkLen;
@@ -898,7 +959,7 @@
       }
 
       private final LinkedListIterator<MessageReference> iterator;
-      
+
       public synchronized void close()
       {
          iterator.close();



More information about the hornetq-commits mailing list