[hornetq-commits] JBoss hornetq SVN: r11312 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 9 11:56:27 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-09 11:56:27 -0400 (Fri, 09 Sep 2011)
New Revision: 11312

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
HORNETQ-765 - Fix on paging / depaging with GC in place

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -47,6 +47,8 @@
    private Long deliveryTime = null;
 
    private int persistedCount;
+   
+   private int messageEstimate;
 
    private AtomicInteger deliveryCount = new AtomicInteger(0);
 
@@ -84,6 +86,7 @@
                              final PageSubscription subscription)
    {
       this.position = position;
+      this.messageEstimate = message.getMessage().getMemoryEstimate();
       this.message = new WeakReference<PagedMessage>(message);
       this.subscription = subscription;
    }
@@ -102,8 +105,18 @@
    {
       return persistedCount;
    }
+   
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+    */
+   public int getMessageMemoryEstimate()
+   {
+      return messageEstimate;
+   }
+
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
     */
    public MessageReference copy(final Queue queue)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -27,4 +27,6 @@
    HandleStatus handle(MessageReference reference) throws Exception;
 
    Filter getFilter();
+   
+   String debug();
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -32,6 +32,13 @@
    boolean isPaged();
    
    ServerMessage getMessage();
+   
+   /**
+    * We define this method aggregation here because on paging we need to hold the original estimate,
+    * so we need to perform some extra steps on paging.
+    * @return
+    */
+   int getMessageMemoryEstimate();
 
    MessageReference copy(Queue queue);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -222,6 +222,11 @@
          notificationService.sendNotification(notification);
       }
    }
+   
+   public String debug()
+   {
+      return toString();
+   }
 
    private void cancelRefs()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -76,6 +76,11 @@
    {
       return null;
    }
+   
+   public String debug()
+   {
+      return toString();
+   }
 
    public synchronized void start()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -260,5 +260,13 @@
       {
          ref.getQueue().acknowledge(tx, this);
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+       */
+      public int getMessageMemoryEstimate()
+      {
+         return ref.getMessage().getMemoryEstimate();
+      }
    }
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -186,7 +186,17 @@
    {
       queue.acknowledge(tx, this);
    }
+   
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+    */
+   public int getMessageMemoryEstimate()
+   {
+      return message.getMemoryEstimate();
+   }
+
+
    // Public --------------------------------------------------------
 
    @Override

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.server.impl;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -75,7 +77,7 @@
 public class QueueImpl implements Queue
 {
    private static final Logger log = Logger.getLogger(QueueImpl.class);
-   
+
    private static final boolean isTrace = log.isTraceEnabled();
 
    public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@@ -107,15 +109,20 @@
 
    private final LinkedListIterator<PagedReference> pageIterator;
 
-   private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new ConcurrentLinkedQueue<MessageReference>();
+   // Messages will first enter intermediateMessageReferences
+   // Before they are added to messageReferences
+   // This is to avoid locking the queue on the producer
+   private final ConcurrentLinkedQueue<MessageReference> intermediateMessageReferences = new ConcurrentLinkedQueue<MessageReference>();
 
+   // This is where messages are stored
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
 
-   // The quantity of pagedReferences on messageREferences priority list
+   // The quantity of pagedReferences on messageReferences priority list
    private final AtomicInteger pagedReferences = new AtomicInteger(0);
 
    // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
    private final AtomicInteger queueMemorySize = new AtomicInteger(0);
+   private final AtomicInteger queueInstances = new AtomicInteger(0);
 
    private final List<ConsumerHolder> consumerList = new ArrayList<ConsumerHolder>();
 
@@ -170,6 +177,48 @@
    private volatile boolean checkDirect;
 
    private volatile boolean directDeliver = true;
+   
+   public String debug()
+   {
+      StringWriter str = new StringWriter();
+      PrintWriter out = new PrintWriter(str);
+      
+      out.println("queueMemorySize=" + queueMemorySize);
+      
+      for (ConsumerHolder holder : consumerList)
+      {
+         out.println("consumer: " + holder.consumer.debug());
+      }
+      
+      for (MessageReference reference : intermediateMessageReferences)
+      {
+         out.print("Intermediate reference:" + reference);
+      }
+      
+      if (intermediateMessageReferences.isEmpty())
+      {
+         out.println("No intermediate references");
+      }
+      
+      boolean foundRef = false;
+      Iterator<MessageReference> iter = messageReferences.iterator();
+      while (iter.hasNext())
+      {
+         foundRef = true;
+         out.println("reference = " + iter.next());
+      }
+      
+      if (!foundRef)
+      {
+         out.println("No permanent references on queue");
+      }
+      
+      
+      
+      System.out.println(str.toString());
+      
+      return str.toString();
+   }
 
    public QueueImpl(final long id,
                     final SimpleString address,
@@ -342,7 +391,7 @@
 
    public synchronized void reload(final MessageReference ref)
    {
-      queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
       if (!scheduledDeliveryHandler.checkAndSchedule(ref, true))
       {
          internalAddTail(ref);
@@ -376,7 +425,7 @@
       if (checkDirect)
       {
          if (direct && !directDeliver &&
-             concurrentQueue.isEmpty() &&
+             intermediateMessageReferences.isEmpty() &&
              messageReferences.isEmpty() &&
              !pageIterator.hasNext() &&
              !pageSubscription.isPaging())
@@ -397,9 +446,10 @@
          return;
       }
       
-      queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+      // We only add queueMemorySize if not being delivered directly
+      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
 
-      concurrentQueue.add(ref);
+      intermediateMessageReferences.add(ref);
 
       directDeliver = false;
 
@@ -498,6 +548,11 @@
 
    public synchronized void addConsumer(final Consumer consumer) throws Exception
    {
+      if (log.isDebugEnabled())
+      {
+         log.debug(this + " adding consumer " + consumer);
+      }
+      
       cancelRedistributor();
 
       if (consumer.getFilter() != null)
@@ -1534,14 +1589,14 @@
     */
    private void internalAddHead(final MessageReference ref)
    {
-      queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
       refAdded(ref);
       messageReferences.addHead(ref, ref.getMessage().getPriority());
    }
 
    private synchronized void doPoll()
    {
-      MessageReference ref = concurrentQueue.poll();
+      MessageReference ref = intermediateMessageReferences.poll();
 
       if (ref != null)
       {
@@ -1564,6 +1619,11 @@
       {
          return;
       }
+      
+      if (log.isDebugEnabled())
+      {
+         log.debug(this + " doing deliver. messageReferences=" + messageReferences.size());
+      }
 
       int busyCount = 0;
 
@@ -1698,6 +1758,10 @@
 
             if (nullRefCount + busyCount == size)
             {
+               if (log.isDebugEnabled())
+               {
+                   log.debug(this + "::All the consumers were busy, giving up now");
+               }
                break;
             }
 
@@ -1723,7 +1787,7 @@
     */
    private void refRemoved(MessageReference ref)
    {
-      queueMemorySize.addAndGet(-ref.getMessage().getMemoryEstimate());
+      queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
       if (ref.isPaged())
       {
          pagedReferences.decrementAndGet();
@@ -1773,6 +1837,8 @@
          log.trace("QueueMemorySize before depage on queue=" + this.getName() + " is " + queueMemorySize.get());
       }
       
+      this.directDeliver = false;
+      
       int depaged = 0;
       while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
       {
@@ -1786,14 +1852,26 @@
          pageIterator.remove();
       }
       
-      if (isTrace)
+      if (log.isDebugEnabled())
       {
          if (depaged == 0 && queueMemorySize.get() >= maxSize)
          {
-            log.trace("Couldn't depage any message as the maxSize on the queue was achieved. There are too many pending messages to be acked in reference to the page configuration");
+            log.debug("Couldn't depage any message as the maxSize on the queue was achieved. " + "There are too many pending messages to be acked in reference to the page configuration");
          }
-         
-         log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
+          
+         if (log.isDebugEnabled())
+         {
+            log.debug("Queue Memory Size after depage on queue=" + this.getName() +
+                      " is " +
+                      queueMemorySize.get() +
+                      " with maxSize = " +
+                      maxSize +
+                      ". Depaged " +
+                      depaged +
+                      " messages, pendingDelivery=" +  messageReferences.size() + ", intermediateMessageReferences= " + intermediateMessageReferences.size() + 
+                      ", queueDelivering=" + deliveringCount.get());
+            
+         }
       }
       
       deliverAsync();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -66,13 +66,6 @@
 
    // Static ---------------------------------------------------------------------------------------
 
-   private static final boolean trace = ServerConsumerImpl.log.isTraceEnabled();
-
-   private static void trace(final String message)
-   {
-      ServerConsumerImpl.log.trace(message);
-   }
-
    // Attributes -----------------------------------------------------------------------------------
 
    private final long id;
@@ -92,6 +85,11 @@
    private boolean started;
 
    private volatile LargeMessageDeliverer largeMessageDeliverer = null;
+   
+   public String debug()
+   {
+      return toString() + "::Delivering " + this.deliveringRefs.size();
+   }
 
    /**
     * if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
@@ -212,6 +210,11 @@
    {
       if (availableCredits != null && availableCredits.get() <= 0)
       {
+         if (log.isDebugEnabled() )
+         {
+            log.debug(this  + " is busy for the lack of credits!!!");
+         }
+         
          return HandleStatus.BUSY;
       }
       
@@ -507,9 +510,9 @@
       {
          int previous = availableCredits.getAndAdd(credits);
 
-         if (ServerConsumerImpl.trace)
+         if (log.isDebugEnabled())
          {
-            ServerConsumerImpl.trace("Received " + credits +
+            log.debug(this + "::Received " + credits +
                                      " credits, previous value = " +
                                      previous +
                                      " currentValue = " +
@@ -518,6 +521,10 @@
 
          if (previous <= 0 && previous + credits > 0)
          {
+            if (log.isTraceEnabled() )
+            {
+               log.trace(this + "::calling promptDelivery from receiving credits");
+            }
             promptDelivery();
          }
       }
@@ -793,9 +800,9 @@
             {
                if (availableCredits != null && availableCredits.get() <= 0)
                {
-                  if (ServerConsumerImpl.trace)
+                  if (ServerConsumerImpl.isTrace)
                   {
-                     ServerConsumerImpl.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+                     log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
                   }
 
                   return false;
@@ -818,9 +825,9 @@
 
                int chunkLen = body.length;
 
-               if (ServerConsumerImpl.trace)
+               if (ServerConsumerImpl.isTrace)
                {
-                  ServerConsumerImpl.trace("deliverLargeMessage: Sending " + packetSize +
+                  log.trace("deliverLargeMessage: Sending " + packetSize +
                                            " availableCredits now is " +
                                            availableCredits);
                }
@@ -840,9 +847,9 @@
                }
             }
 
-            if (ServerConsumerImpl.trace)
+            if (ServerConsumerImpl.isTrace)
             {
-               ServerConsumerImpl.trace("Finished deliverLargeMessage");
+               log.trace("Finished deliverLargeMessage");
             }
 
             finish();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -54,6 +54,12 @@
    private Filter filter = null;
 
    private boolean isStarted = false;
+   
+   
+   public String debug()
+   {
+      return toString();
+   }
 
    public OutgoingTweetsHandler(final String connectorName,
                                 final Map<String, Object> configuration,

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -0,0 +1,506 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A MultipleConsumersPageStressTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class MultipleConsumersPageStressTest extends ServiceTestBase
+{
+
+   private final Logger log = Logger.getLogger(this.getClass());
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final static int TIME_TO_RUN = 60 * 1000;
+
+   private static final SimpleString ADDRESS = new SimpleString("page-adr");
+
+   private int numberOfProducers;
+
+   private int numberOfConsumers;
+
+   private QueueImpl pagedServerQueue;
+
+   private boolean shareConnectionFactory = true;
+
+   private boolean openConsumerOnEveryLoop = true;
+
+   private HornetQServer messagingService;
+
+   private ServerLocator sharedLocator;
+
+   private ClientSessionFactory sharedSf;
+
+   final AtomicInteger messagesAvailable = new AtomicInteger(0);
+
+   private volatile boolean runningProducer = true;
+
+   private volatile boolean runningConsumer = true;
+
+   ArrayList<TestProducer> producers = new ArrayList<TestProducer>();
+
+   ArrayList<TestConsumer> consumers = new ArrayList<TestConsumer>();
+
+   ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
+
+   public void testOpenConsumerEveryTimeDefaultFlowControl0() throws Throwable
+   {
+      shareConnectionFactory = true;
+      openConsumerOnEveryLoop = true;
+      numberOfProducers = 1;
+      numberOfConsumers = 1;
+
+      sharedLocator = createInVMNonHALocator();
+      sharedLocator.setConsumerWindowSize(0);
+
+      sharedSf = sharedLocator.createSessionFactory();
+
+      internalMultipleConsumers();
+   }
+
+   @Override
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      Configuration config = createDefaultConfig();
+
+      HashMap<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
+
+      // messagingService = createServer(true, config, 10024, 20024, settings);
+      messagingService = createServer(true, config, 10024, 200024, settings);
+      messagingService.start();
+
+      pagedServerQueue = (QueueImpl)messagingService.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+   }
+
+   @Override
+   public void tearDown() throws Exception
+   {
+      for (Tester tst : producers)
+      {
+         tst.close();
+      }
+      for (Tester tst : consumers)
+      {
+         tst.close();
+      }
+      sharedSf.close();
+      sharedLocator.close();
+      messagingService.stop();
+      super.tearDown();
+   }
+
+   public void testOpenConsumerEveryTimeDefaultFlowControl() throws Throwable
+   {
+      shareConnectionFactory = true;
+      openConsumerOnEveryLoop = true;
+      numberOfProducers = 1;
+      numberOfConsumers = 1;
+
+      sharedLocator = createInVMNonHALocator();
+
+      sharedSf = sharedLocator.createSessionFactory();
+
+      System.out.println(pagedServerQueue.debug());
+
+      internalMultipleConsumers();
+
+   }
+
+   public void testReuseConsumersFlowControl0() throws Throwable
+   {
+      shareConnectionFactory = true;
+      openConsumerOnEveryLoop = false;
+      numberOfProducers = 1;
+      numberOfConsumers = 1;
+
+      sharedLocator = createInVMNonHALocator();
+      sharedLocator.setConsumerWindowSize(0);
+
+      sharedSf = sharedLocator.createSessionFactory();
+
+      try
+      {
+         internalMultipleConsumers();
+      }
+      catch (Throwable e)
+      {
+         TestConsumer tstConsumer = consumers.get(0);
+         System.out.println("first retry: " + tstConsumer.consumer.receive(1000));
+
+         System.out.println(pagedServerQueue.debug());
+
+         pagedServerQueue.forceDelivery();
+         System.out.println("Second retry: " + tstConsumer.consumer.receive(1000));
+
+         System.out.println(pagedServerQueue.debug());
+
+
+         tstConsumer.session.commit();
+         System.out.println("Third retry:" + tstConsumer.consumer.receive(1000));
+
+         tstConsumer.close();
+
+         ClientSession session = sharedSf.createSession();
+         session.start();
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         pagedServerQueue.forceDelivery();
+
+         System.out.println("Fourth retry: " + consumer.receive(1000));
+
+         System.out.println(pagedServerQueue.debug());
+         
+         throw e;
+      }
+
+   }
+
+   public void internalMultipleConsumers() throws Throwable
+   {
+      for (int i = 0; i < numberOfProducers; i++)
+      {
+         producers.add(new TestProducer());
+      }
+
+      for (int i = 0; i < numberOfConsumers; i++)
+      {
+         consumers.add(new TestConsumer());
+      }
+
+      for (Tester test : producers)
+      {
+         test.start();
+      }
+
+      Thread.sleep(2000);
+
+      for (Tester test : consumers)
+      {
+         test.start();
+      }
+
+      for (Tester test : consumers)
+      {
+         test.join();
+      }
+
+      runningProducer = false;
+
+      for (Tester test : producers)
+      {
+         test.join();
+      }
+
+      for (Throwable e : exceptions)
+      {
+         throw e;
+      }
+
+   }
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   abstract class Tester extends Thread
+   {
+      Random random = new Random();
+
+      public abstract void close();
+
+      protected abstract boolean enabled();
+
+      protected void exceptionHappened(final Throwable e)
+      {
+         runningConsumer = false;
+         runningProducer = false;
+         e.printStackTrace();
+         exceptions.add(e);
+      }
+
+      public int getNumberOfMessages() throws Exception
+      {
+         int numberOfMessages = random.nextInt(20);
+         if (numberOfMessages <= 0)
+         {
+            return 1;
+         }
+         else
+         {
+            return numberOfMessages;
+         }
+      }
+   }
+
+   class TestConsumer extends Tester
+   {
+
+      public ClientConsumer consumer = null;
+
+      public ClientSession session = null;
+
+      public ServerLocator locator = null;
+
+      public ClientSessionFactory sf = null;
+
+      @Override
+      public void close()
+      {
+         try
+         {
+
+            if (!openConsumerOnEveryLoop)
+            {
+               consumer.close();
+            }
+            session.rollback();
+            session.close();
+
+            if (!shareConnectionFactory)
+            {
+               sf.close();
+               locator.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+      }
+
+      @Override
+      protected boolean enabled()
+      {
+         return runningConsumer;
+      }
+
+      @Override
+      public int getNumberOfMessages() throws Exception
+      {
+         while (enabled())
+         {
+            int numberOfMessages = super.getNumberOfMessages();
+
+            int resultMessages = messagesAvailable.addAndGet(-numberOfMessages);
+
+            if (resultMessages < 0)
+            {
+               messagesAvailable.addAndGet(-numberOfMessages);
+               numberOfMessages = 0;
+               System.out.println("Negative, giving a little wait");
+               Thread.sleep(1000);
+            }
+
+            if (numberOfMessages > 0)
+            {
+               return numberOfMessages;
+            }
+         }
+
+         return 0;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            if (shareConnectionFactory)
+            {
+               session = sharedSf.createSession(false, false);
+            }
+            else
+            {
+               locator = createInVMNonHALocator();
+               sf = locator.createSessionFactory();
+               session = sf.createSession(false, false);
+            }
+
+            long timeOut = System.currentTimeMillis() + MultipleConsumersPageStressTest.TIME_TO_RUN;
+
+            session.start();
+
+            if (!openConsumerOnEveryLoop)
+            {
+               consumer = session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+            }
+
+            int count = 0;
+
+            while (enabled() && timeOut > System.currentTimeMillis())
+            {
+
+               if (openConsumerOnEveryLoop)
+               {
+                  consumer = session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+               }
+
+               int numberOfMessages = getNumberOfMessages();
+
+               for (int i = 0; i < numberOfMessages; i++)
+               {
+                  ClientMessage msg = consumer.receive(10000);
+                  if (msg == null)
+                  {
+                     log.warn("msg " + count +
+                              " was null, currentBatchSize=" +
+                              numberOfMessages +
+                              ", current msg being read=" +
+                              i);
+                  }
+                  Assert.assertNotNull("msg " + count +
+                                       " was null, currentBatchSize=" +
+                                       numberOfMessages +
+                                       ", current msg being read=" +
+                                       i, msg);
+
+                  if (numberOfConsumers == 1 && numberOfProducers == 1)
+                  {
+                     Assert.assertEquals(count, msg.getIntProperty("count").intValue());
+                  }
+
+                  count++;
+
+                  msg.acknowledge();
+               }
+
+               session.commit();
+
+               if (openConsumerOnEveryLoop)
+               {
+                  consumer.close();
+               }
+
+            }
+         }
+         catch (Throwable e)
+         {
+            exceptionHappened(e);
+         }
+
+      }
+   }
+
+   class TestProducer extends Tester
+   {
+      ClientSession session = null;
+
+      ClientSessionFactory sf = null;
+
+      ServerLocator locator = null;
+
+      @Override
+      public void close()
+      {
+         try
+         {
+            session.rollback();
+            session.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+
+      }
+
+      @Override
+      protected boolean enabled()
+      {
+         return runningProducer;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            if (shareConnectionFactory)
+            {
+               session = sharedSf.createSession(false, false);
+            }
+            else
+            {
+               locator = createInVMNonHALocator();
+               sf = locator.createSessionFactory();
+               session = sf.createSession(false, false);
+            }
+
+            ClientProducer prod = session.createProducer(MultipleConsumersPageStressTest.ADDRESS);
+
+            int count = 0;
+
+            while (enabled())
+            {
+               int numberOfMessages = getNumberOfMessages();
+
+               for (int i = 0; i < numberOfMessages; i++)
+               {
+                  ClientMessage msg = session.createMessage(true);
+                  msg.putStringProperty("Test", "This is a simple test");
+                  msg.putIntProperty("count", count++);
+                  prod.send(msg);
+               }
+
+               messagesAvailable.addAndGet(numberOfMessages);
+               session.commit();
+            }
+         }
+         catch (Throwable e)
+         {
+            exceptionHappened(e);
+         }
+      }
+   }
+
+}

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java	2011-09-09 15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java	2011-09-09 15:56:27 UTC (rev 11312)
@@ -55,6 +55,11 @@
       return filter;
    }
 
+   public String debug()
+   {
+      return toString();
+   }
+
    public synchronized MessageReference waitForNextReference(long timeout)
    {
       while (references.isEmpty() && timeout > 0)



More information about the hornetq-commits mailing list