[hornetq-commits] JBoss hornetq SVN: r11546 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392: src/main/org/hornetq/core/protocol/core/impl/wireformat and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 14 14:52:56 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-10-14 14:52:56 -0400 (Fri, 14 Oct 2011)
New Revision: 11546

Modified:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7389 - flow control on large message

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -574,7 +574,7 @@
          largeMessageCache.deleteOnExit();
       }
 
-      currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+      currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 5, largeMessageCache);
 
       if (currentChunkMessage.isCompressed())
       {
@@ -596,7 +596,18 @@
       {
          return;
       }
-      currentLargeMessageController.addPacket(chunk);
+      if (currentLargeMessageController == null)
+      {
+         if (log.isTraceEnabled())
+         {
+            log.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+         }
+         flowControl(chunk.getPacketSize(), false);
+      }
+      else
+      {
+         currentLargeMessageController.addPacket(chunk);
+      }
    }
 
    public void clear(boolean waitForOnMessage) throws HornetQException
@@ -609,12 +620,39 @@
 
          while (iter.hasNext())
          {
-            ClientMessageInternal message = iter.next();
+            try
+            {
+               ClientMessageInternal message = iter.next();
+               
+               if (message.isLargeMessage())
+               {
+                  ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
+                  largeMessage.getLargeMessageController().cancel();
+               }
 
-            flowControlBeforeConsumption(message);
+               flowControlBeforeConsumption(message);
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+            }
          }
 
          clearBuffer();
+         
+         try
+         {
+            if (currentLargeMessageController != null)
+            {
+               currentLargeMessageController.cancel();
+               currentLargeMessageController = null;
+            }
+         }
+         catch (Throwable e)
+         {
+            // nothing that could be done here
+            log.warn(e.getMessage(), e);
+         }
       }
 
       // Need to send credits for the messages in the buffer
@@ -680,6 +718,11 @@
       if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;
+         
+         if (log.isTraceEnabled())
+         {
+            log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
+         }
 
          if (creditsToSend >= clientWindowSize)
          {
@@ -687,7 +730,7 @@
             {
                if (ClientConsumerImpl.trace)
                {
-                  ClientConsumerImpl.log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+                  ClientConsumerImpl.log.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
                }
 
                // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -31,6 +31,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
          {
             checkForPacket(totalSize - 1);
          }
-         catch (Exception ignored)
+         catch (Throwable ignored)
          {
          }
       }
@@ -227,6 +228,24 @@
 
    public synchronized void cancel()
    {
+      
+      int totalSize = 0;
+      Packet polledPacket = null;
+      while ((polledPacket = packets.poll()) != null)
+      {
+         totalSize += polledPacket.getPacketSize();
+      }
+
+      try
+      {
+         consumerInternal.flowControl(totalSize, false);
+      }
+      catch (Exception ignored)
+      {
+         // what else can we do here?
+         log.warn(ignored.getMessage(), ignored);
+      }
+       
       packets.offer(new SessionReceiveContinuationMessage());
       streamEnded = true;
       streamClosed = true;
@@ -279,6 +298,11 @@
 
    public synchronized void saveBuffer(final OutputStream output) throws HornetQException
    {
+      if (streamClosed)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE,
+                                    "The large message lost connection with its session, either because of a rollback or a closed session");
+      }
       setOutputStream(output);
       waitCompletion(0);
    }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -64,7 +64,14 @@
     */
    public byte[] getBody()
    {
-      return body;
+      if (size <= 0)
+      {
+         return new byte[0];
+      }
+      else
+      {
+         return body;
+      }
    }
 
    /**

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -77,6 +77,22 @@
       super.encodeRest(buffer);
       buffer.writeLong(consumerID);
    }
+   @Override
+   public int getPacketSize()
+   {
+      if (size == -1)
+      {
+         // This packet was created by the LargeMessageController
+         // TODO: Get rid of this scenario
+         return 0;
+      }
+      else
+      {
+         return size;
+      }
+   }
+   
+   
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -17,7 +17,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -61,7 +61,7 @@
    // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-   
+
    private static boolean isTrace = log.isTraceEnabled();
 
    // Static ---------------------------------------------------------------------------------------
@@ -85,14 +85,12 @@
    private boolean started;
 
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-   
+
    public String debug()
    {
       return toString() + "::Delivering " + this.deliveringRefs.size();
    }
 
-   private boolean largeMessageInDelivery;
-
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
     */
@@ -117,7 +115,7 @@
    private final Binding binding;
 
    private boolean transferring = false;
-   
+
    /* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
     * This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
     * write queue when the TCP buffer is full, e.g. the client is slow or has died.    
@@ -165,11 +163,11 @@
       minLargeMessageSize = session.getMinLargeMessageSize();
 
       this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-      
+
       this.callback.addReadyListener(this);
 
       this.creationTime = System.currentTimeMillis();
-      
+
       if (browseOnly)
       {
          browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -187,7 +185,7 @@
    {
       return id;
    }
-   
+
    public boolean isBrowseOnly()
    {
       return browseOnly;
@@ -197,12 +195,12 @@
    {
       return creationTime;
    }
-   
+
    public String getConnectionID()
    {
       return this.session.getConnectionID().toString();
    }
-   
+
    public String getSessionID()
    {
       return this.session.getName();
@@ -212,20 +210,23 @@
    {
       if (availableCredits != null && availableCredits.get() <= 0)
       {
-         if (log.isDebugEnabled() )
+         if (log.isDebugEnabled())
          {
-            log.debug(this  + " is busy for the lack of credits!!!");
+            log.debug(this + " is busy for the lack of credits. Current credits = " +
+                      availableCredits +
+                      " Can't receive reference " +
+                      ref);
          }
-         
+
          return HandleStatus.BUSY;
       }
-      
-// TODO - https://jira.jboss.org/browse/HORNETQ-533      
-//      if (!writeReady.get())
-//      {
-//         return HandleStatus.BUSY;
-//      }
-      
+
+      // TODO - https://jira.jboss.org/browse/HORNETQ-533
+      // if (!writeReady.get())
+      // {
+      // return HandleStatus.BUSY;
+      // }
+
       synchronized (lock)
       {
          // If the consumer is stopped then we don't accept the message, it
@@ -238,11 +239,23 @@
 
          // If there is a pendingLargeMessage we can't take another message
          // This has to be checked inside the lock as the set to null is done inside the lock
-         if (largeMessageInDelivery)
+         if (largeMessageDeliverer != null)
          {
+            if (log.isDebugEnabled())
+            {
+               log.debug(this + " is busy delivering large message " +
+                         largeMessageDeliverer +
+                         ", can't deliver reference " +
+                         ref);
+            }
             return HandleStatus.BUSY;
          }
 
+         if (log.isTraceEnabled())
+         {
+            log.trace("Handling reference " + ref);
+         }
+
          final ServerMessage message = ref.getMessage();
 
          if (filter != null && !filter.match(message))
@@ -265,7 +278,9 @@
             // the updateDeliveryCount would still be updated after c
             if (strictUpdateDeliveryCount && !ref.isPaged())
             {
-               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
+               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+                   !ref.getQueue().isInternalQueue() &&
+                   !ref.isPaged())
                {
                   storageManager.updateDeliveryCount(ref);
                }
@@ -306,7 +321,7 @@
    public void close(final boolean failed) throws Exception
    {
       callback.removeReadyListener(this);
-      
+
       setStarted(false);
 
       if (largeMessageDeliverer != null)
@@ -352,8 +367,8 @@
 
          props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
 
-         props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
-                                                                                        : filter.getFilterString());
+         props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+                                       filter == null ? null : filter.getFilterString());
 
          props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
@@ -403,10 +418,28 @@
       }
    }
 
-   public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+   public LinkedList<MessageReference> cancelRefs(final boolean failed,
+                                                  final boolean lastConsumedAsDelivered,
+                                                  final Transaction tx) throws Exception
    {
       boolean performACK = lastConsumedAsDelivered;
 
+      try
+      {
+         if (largeMessageDeliverer != null)
+         {
+            largeMessageDeliverer.finish();
+         }
+      }
+      catch (Throwable e)
+      {
+         log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
+      }
+      finally
+      {
+         largeMessageDeliverer = null;
+      }
+
       LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
 
       if (!deliveringRefs.isEmpty())
@@ -427,8 +460,9 @@
             {
                if (!failed)
                {
-                  //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
-                  //before failure
+                  // We don't decrement delivery count if the client failed, since there's a possibility that refs
+                  // were actually delivered but we just didn't get any acks for them
+                  // before failure
                   ref.decrementDeliveryCount();
                }
 
@@ -461,21 +495,6 @@
       synchronized (lock)
       {
          this.transferring = transferring;
-
-         if (transferring)
-         {
-            // Now we must wait for any large message delivery to finish
-            while (largeMessageInDelivery)
-            {
-               try
-               {
-                  Thread.sleep(1);
-               }
-               catch (InterruptedException ignore)
-               {
-               }
-            }
-         }
       }
 
       // Outside the lock
@@ -504,18 +523,23 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {      
+   {
       if (credits == -1)
       {
+         if (log.isDebugEnabled())
+         {
+            log.debug(this + ":: FlowControl::Received disable flow control message");
+         }
          // No flow control
          availableCredits = null;
-         
-         //There may be messages already in the queue
+
+         // There may be messages already in the queue
          promptDelivery();
       }
       else if (credits == 0)
       {
-         //reset, used on slow consumers
+         // reset, used on slow consumers
+         log.debug(this + ":: FlowControl::Received reset flow control message");
          availableCredits.set(0);
       }
       else
@@ -524,16 +548,17 @@
 
          if (log.isDebugEnabled())
          {
-            log.debug(this + "::Received " + credits +
-                                     " credits, previous value = " +
-                                     previous +
-                                     " currentValue = " +
-                                     availableCredits.get());
+            log.debug(this + "::FlowControl::Received " +
+                      credits +
+                      " credits, previous value = " +
+                      previous +
+                      " currentValue = " +
+                      availableCredits.get());
          }
 
          if (previous <= 0 && previous + credits > 0)
          {
-            if (log.isTraceEnabled() )
+            if (log.isTraceEnabled())
             {
                log.trace(this + "::calling promptDelivery from receiving credits");
             }
@@ -553,7 +578,7 @@
       {
          return;
       }
-      
+
       // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
       // acknowledged
 
@@ -585,21 +610,21 @@
       }
       while (ref.getMessage().getMessageID() != messageID);
    }
-   
+
    public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
    {
       if (browseOnly)
       {
          return;
       }
-      
+
       MessageReference ref = removeReferenceByID(messageID);
-      
+
       if (ref == null)
       {
          throw new IllegalStateException("Cannot find ref to ack " + messageID);
       }
-      
+
       if (autoCommitAcks)
       {
          ref.getQueue().acknowledge(ref);
@@ -639,13 +664,13 @@
 
       return ref;
    }
-      
+
    public void readyForWriting(final boolean ready)
    {
       if (ready)
       {
          writeReady.set(true);
-         
+
          promptDelivery();
       }
       else
@@ -664,28 +689,35 @@
 
    private void promptDelivery()
    {
-      synchronized (lock)
+      if (largeMessageDeliverer != null)
       {
-         // largeMessageDeliverer is aways set inside a lock
-         // if we don't acquire a lock, we will have NPE eventually
-         if (largeMessageDeliverer != null)
+         resumeLargeMessage();
+      }
+      else
+      {
+         if (browseOnly)
          {
-            resumeLargeMessage();
+            messageQueue.getExecutor().execute(browserDeliverer);
          }
          else
          {
-            if (browseOnly)
-            {
-               messageQueue.getExecutor().execute(browserDeliverer);
-            }
-            else
-            {
-               messageQueue.forceDelivery();
-            }
+            messageQueue.forceDelivery();
          }
       }
    }
 
+   private void forceDelivery()
+   {
+      if (browseOnly)
+      {
+         messageQueue.getExecutor().execute(browserDeliverer);
+      }
+      else
+      {
+         messageQueue.deliverAsync();
+      }
+   }
+
    private void resumeLargeMessage()
    {
       messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
@@ -693,8 +725,6 @@
 
    private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
    {
-      largeMessageInDelivery = true;
-
       final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
 
       // it doesn't need lock because deliverLargeMesasge is already inside the lock()
@@ -713,6 +743,14 @@
       if (availableCredits != null)
       {
          availableCredits.addAndGet(-packetSize);
+
+         if (log.isTraceEnabled())
+         {
+            log.trace(this + "::FlowControl::delivery standard taking " +
+                      packetSize +
+                      " from credits, available now is " +
+                      availableCredits);
+         }
       }
    }
 
@@ -729,16 +767,7 @@
             {
                if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
                {
-                  if (browseOnly)
-                  {
-                     messageQueue.getExecutor().execute(browserDeliverer);
-                  }
-                  else
-                  {
-                     // prompt Delivery only if chunk was finished
-
-                     messageQueue.deliverAsync();
-                  }
+                  forceDelivery();
                }
             }
             catch (Exception e)
@@ -786,6 +815,12 @@
 
             if (availableCredits != null && availableCredits.get() <= 0)
             {
+               if (log.isTraceEnabled())
+               {
+                  log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+                            availableCredits);
+               }
+
                return false;
             }
 
@@ -794,7 +829,7 @@
                context = largeMessage.getBodyEncoder();
 
                sizePendingLargeMessage = context.getLargeBodySize();
-               
+
                context.open();
 
                sentInitialPacket = true;
@@ -807,6 +842,15 @@
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-packetSize);
+
+                  if (log.isTraceEnabled())
+                  {
+                     log.trace(this + "::FlowControl::" +
+                               " deliver initialpackage with " +
+                               packetSize +
+                               " delivered, available now = " +
+                               availableCredits);
+                  }
                }
 
                // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -822,7 +866,8 @@
                {
                   if (ServerConsumerImpl.isTrace)
                   {
-                     log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+                     log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+                               availableCredits);
                   }
 
                   return false;
@@ -845,16 +890,17 @@
 
                int chunkLen = body.length;
 
-               if (ServerConsumerImpl.isTrace)
-               {
-                  log.trace("deliverLargeMessage: Sending " + packetSize +
-                                           " availableCredits now is " +
-                                           availableCredits);
-               }
-
                if (availableCredits != null)
                {
                   availableCredits.addAndGet(-packetSize);
+
+                  if (log.isTraceEnabled())
+                  {
+                     log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+                               packetSize +
+                               " available now=" +
+                               availableCredits);
+                  }
                }
 
                positionPendingLargeMessage += chunkLen;
@@ -903,8 +949,6 @@
 
             largeMessageDeliverer = null;
 
-            largeMessageInDelivery = false;
-
             largeMessage = null;
          }
       }
@@ -920,7 +964,7 @@
       }
 
       private final LinkedListIterator<MessageReference> iterator;
-      
+
       public synchronized void close()
       {
          iterator.close();

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -37,6 +37,7 @@
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
 
 /**
@@ -911,6 +912,123 @@
       internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
    }
 
+   public void testFlowControl() throws Exception
+   {
+      internalTestFlowControlOnRollback(false);
+   }
+   
+   public void testFlowControlLargeMessage() throws Exception
+   {
+      internalTestFlowControlOnRollback(true);
+   }
+   
+   private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws Exception
+   {
+
+      HornetQServer server = createServer(false, isNetty());
+      
+      AddressSettings settings = new AddressSettings();
+      settings.setMaxDeliveryAttempts(-1);
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         server.start();
+
+         locator.setConsumerWindowSize(300000);
+         
+         if (isLargeMessage)
+         {
+            // something to ensure we are using large messages
+            locator.setMinLargeMessageSize(100);
+         }
+         else
+         {
+            // To make sure large messages won't kick in, we set anything large
+            locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+         }
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false, false);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+         
+         
+         ClientProducer producer = session.createProducer(ADDRESS);
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = session.createMessage(true);
+            msg.putIntProperty("count", i);
+            msg.getBodyBuffer().writeBytes(new byte[1024]);
+            producer.send(msg);
+         }
+         
+         session.commit();
+         
+         ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+         
+         session.start();
+
+         for (int repeat = 0; repeat < 100; repeat ++)
+         {
+            System.out.println("Repeat " + repeat);
+            long timeout = System.currentTimeMillis() + 2000;
+            // At least 10 messages on the buffer
+            while (timeout > System.currentTimeMillis() && consumer.getBufferSize() <= 10)
+            {
+               Thread.sleep(10);
+            }
+            assertTrue(consumer.getBufferSize() >= 10);
+            
+            ClientMessage msg = consumer.receive(500);
+            msg.getBodyBuffer().readByte();
+            assertNotNull(msg);
+            msg.acknowledge();
+            session.rollback();
+         }
+         
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = consumer.receive(5000);
+            assertNotNull(msg);
+            System.out.println("msg " + msg);
+            msg.getBodyBuffer().readByte();
+            msg.acknowledge();
+            session.commit();
+         }
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+            {
+               session.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (server.isStarted())
+         {
+            server.stop();
+         }
+      }
+   }
+
+
+
    public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
    {
 

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-10-14 18:52:56 UTC (rev 11546)
@@ -15,6 +15,9 @@
 
 import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
    {
       return false;
    }
+   
+   public void testRollbackPartiallyConsumedBuffer() throws Exception
+   {
+      for (int i = 0 ; i < 1; i++)
+      {
+         log.info("#test " + i);
+         internalTestRollbackPartiallyConsumedBuffer(false);
+         tearDown();
+         setUp();
+         
+      }
+      
+   }
+   
+   public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+   {
+      internalTestRollbackPartiallyConsumedBuffer(true);
+   }
+   
+   
+   private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
+   {
+      final int messageSize = 100 * 1024;
+      
 
+      final ClientSession session;
+
+      try
+      {
+         server = createServer(true, isNetty());
+         
+         AddressSettings settings = new AddressSettings();
+         if (redeliveryDelay)
+         {
+            settings.setRedeliveryDelay(1000);
+            if (locator.isCompressLargeMessage())
+            {
+               locator.setConsumerWindowSize(0);
+            }
+         }
+         settings.setMaxDeliveryAttempts(-1);
+         
+         server.getAddressSettingsRepository().addMatch("#", settings);
+
+         server.start();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         for (int i = 0 ; i < 20; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+            
+            clientFile.putIntProperty("value", i);
+   
+            producer.send(clientFile);
+         }
+
+         session.commit();
+
+         session.start();
+         
+         final CountDownLatch latch = new CountDownLatch(1);
+         
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         
+         consumer.setMessageHandler(new MessageHandler()
+         {
+            int counter = 0;
+            public void onMessage(ClientMessage message)
+            {
+               message.getBodyBuffer().readByte();
+               System.out.println("message:" + message);
+               try
+               {
+                  if (counter ++ <  20)
+                  {
+                     Thread.sleep(100);
+                     System.out.println("Rollback");
+                     message.acknowledge();
+                     session.rollback();
+                  }
+                  else
+                  {
+                     message.acknowledge();
+                     session.commit();
+                  }
+                  
+                  if (counter == 40)
+                  {
+                     latch.countDown();
+                  }
+               }
+               catch (Exception e)
+               {
+                  latch.countDown();
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         
+         assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+         consumer.close();
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
       {
          try
          {
-            server.stop();
+            session.close();
          }
          catch (Throwable ignored)
          {
@@ -132,7 +264,7 @@
 
          try
          {
-            session.close();
+            server.stop();
          }
          catch (Throwable ignored)
          {
@@ -500,16 +632,17 @@
          ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
 
          ClientMessage msg1 = consumerExpiry.receive(5000);
+         assertTrue(msg1.isLargeMessage());
          Assert.assertNotNull(msg1);
          msg1.acknowledge();
 
-         session.rollback();
-
          for (int j = 0; j < messageSize; j++)
          {
             Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
          }
 
+         session.rollback();
+
          consumerExpiry.close();
 
          for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
             Assert.assertNotNull(msg1);
             msg1.acknowledge();
 
-            session.rollback();
-
             for (int j = 0; j < messageSize; j++)
             {
                Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
             }
 
+            session.rollback();
+
             consumerExpiry.close();
          }
 
@@ -638,13 +771,13 @@
          Assert.assertNotNull(msg1);
          msg1.acknowledge();
 
-         session.rollback();
-
          for (int j = 0; j < messageSize; j++)
          {
             Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
          }
 
+         session.rollback();
+
          consumerExpiry.close();
 
          for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
             Assert.assertNotNull(msg1);
             msg1.acknowledge();
 
-            session.rollback();
-
             for (int j = 0; j < messageSize; j++)
             {
                Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
             }
 
+            session.rollback();
+
             consumerExpiry.close();
          }
 
@@ -1892,6 +2025,7 @@
 
          ClientConsumer consumer = session.createConsumer(queue[1]);
          ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+         msg.getBodyBuffer().readByte();
          Assert.assertNull(consumer.receiveImmediate());
          Assert.assertNotNull(msg);
 



More information about the hornetq-commits mailing list