Author: clebert.suconic(a)jboss.com
Date: 2011-09-09 11:56:27 -0400 (Fri, 09 Sep 2011)
New Revision: 11312
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
Log:
HORNETQ-765 - Fix on paging / depaging with GC in place
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -47,6 +47,8 @@
private Long deliveryTime = null;
private int persistedCount;
+
+ private int messageEstimate;
private AtomicInteger deliveryCount = new AtomicInteger(0);
@@ -84,6 +86,7 @@
final PageSubscription subscription)
{
this.position = position;
+ this.messageEstimate = message.getMessage().getMemoryEstimate();
this.message = new WeakReference<PagedMessage>(message);
this.subscription = subscription;
}
@@ -102,8 +105,18 @@
{
return persistedCount;
}
+
/* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return messageEstimate;
+ }
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
*/
public MessageReference copy(final Queue queue)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java 2011-09-09
15:07:29 UTC (rev 11311)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Consumer.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -27,4 +27,6 @@
HandleStatus handle(MessageReference reference) throws Exception;
Filter getFilter();
+
+ String debug();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -32,6 +32,13 @@
boolean isPaged();
ServerMessage getMessage();
+
+ /**
+ * We define this method aggregation here because on paging we need to hold the
original estimate,
+ * so we need to perform some extra steps on paging.
+ * @return
+ */
+ int getMessageMemoryEstimate();
MessageReference copy(Queue queue);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -222,6 +222,11 @@
notificationService.sendNotification(notification);
}
}
+
+ public String debug()
+ {
+ return toString();
+ }
private void cancelRefs()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -76,6 +76,11 @@
{
return null;
}
+
+ public String debug()
+ {
+ return toString();
+ }
public synchronized void start()
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -260,5 +260,13 @@
{
ref.getQueue().acknowledge(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return ref.getMessage().getMemoryEstimate();
+ }
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -186,7 +186,17 @@
{
queue.acknowledge(tx, this);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.MessageReference#getMessageMemoryEstimate()
+ */
+ public int getMessageMemoryEstimate()
+ {
+ return message.getMemoryEstimate();
+ }
+
+
// Public --------------------------------------------------------
@Override
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -13,6 +13,8 @@
package org.hornetq.core.server.impl;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -75,7 +77,7 @@
public class QueueImpl implements Queue
{
private static final Logger log = Logger.getLogger(QueueImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@@ -107,15 +109,20 @@
private final LinkedListIterator<PagedReference> pageIterator;
- private final ConcurrentLinkedQueue<MessageReference> concurrentQueue = new
ConcurrentLinkedQueue<MessageReference>();
+ // Messages will first enter intermediateMessageReferences
+ // Before they are added to messageReferences
+ // This is to avoid locking the queue on the producer
+ private final ConcurrentLinkedQueue<MessageReference>
intermediateMessageReferences = new ConcurrentLinkedQueue<MessageReference>();
+ // This is where messages are stored
private final PriorityLinkedList<MessageReference> messageReferences = new
PriorityLinkedListImpl<MessageReference>(QueueImpl.NUM_PRIORITIES);
- // The quantity of pagedReferences on messageREferences priority list
+ // The quantity of pagedReferences on messageReferences priority list
private final AtomicInteger pagedReferences = new AtomicInteger(0);
// The estimate of memory being consumed by this queue. Used to calculate instances of
messages to depage
private final AtomicInteger queueMemorySize = new AtomicInteger(0);
+ private final AtomicInteger queueInstances = new AtomicInteger(0);
private final List<ConsumerHolder> consumerList = new
ArrayList<ConsumerHolder>();
@@ -170,6 +177,48 @@
private volatile boolean checkDirect;
private volatile boolean directDeliver = true;
+
+ public String debug()
+ {
+ StringWriter str = new StringWriter();
+ PrintWriter out = new PrintWriter(str);
+
+ out.println("queueMemorySize=" + queueMemorySize);
+
+ for (ConsumerHolder holder : consumerList)
+ {
+ out.println("consumer: " + holder.consumer.debug());
+ }
+
+ for (MessageReference reference : intermediateMessageReferences)
+ {
+ out.print("Intermediate reference:" + reference);
+ }
+
+ if (intermediateMessageReferences.isEmpty())
+ {
+ out.println("No intermediate references");
+ }
+
+ boolean foundRef = false;
+ Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ foundRef = true;
+ out.println("reference = " + iter.next());
+ }
+
+ if (!foundRef)
+ {
+ out.println("No permanent references on queue");
+ }
+
+
+
+ System.out.println(str.toString());
+
+ return str.toString();
+ }
public QueueImpl(final long id,
final SimpleString address,
@@ -342,7 +391,7 @@
public synchronized void reload(final MessageReference ref)
{
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
if (!scheduledDeliveryHandler.checkAndSchedule(ref, true))
{
internalAddTail(ref);
@@ -376,7 +425,7 @@
if (checkDirect)
{
if (direct && !directDeliver &&
- concurrentQueue.isEmpty() &&
+ intermediateMessageReferences.isEmpty() &&
messageReferences.isEmpty() &&
!pageIterator.hasNext() &&
!pageSubscription.isPaging())
@@ -397,9 +446,10 @@
return;
}
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ // We only add queueMemorySize if not being delivered directly
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
- concurrentQueue.add(ref);
+ intermediateMessageReferences.add(ref);
directDeliver = false;
@@ -498,6 +548,11 @@
public synchronized void addConsumer(final Consumer consumer) throws Exception
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " adding consumer " + consumer);
+ }
+
cancelRedistributor();
if (consumer.getFilter() != null)
@@ -1534,14 +1589,14 @@
*/
private void internalAddHead(final MessageReference ref)
{
- queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
refAdded(ref);
messageReferences.addHead(ref, ref.getMessage().getPriority());
}
private synchronized void doPoll()
{
- MessageReference ref = concurrentQueue.poll();
+ MessageReference ref = intermediateMessageReferences.poll();
if (ref != null)
{
@@ -1564,6 +1619,11 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " doing deliver. messageReferences=" +
messageReferences.size());
+ }
int busyCount = 0;
@@ -1698,6 +1758,10 @@
if (nullRefCount + busyCount == size)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::All the consumers were busy, giving up
now");
+ }
break;
}
@@ -1723,7 +1787,7 @@
*/
private void refRemoved(MessageReference ref)
{
- queueMemorySize.addAndGet(-ref.getMessage().getMemoryEstimate());
+ queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
if (ref.isPaged())
{
pagedReferences.decrementAndGet();
@@ -1773,6 +1837,8 @@
log.trace("QueueMemorySize before depage on queue=" + this.getName() +
" is " + queueMemorySize.get());
}
+ this.directDeliver = false;
+
int depaged = 0;
while (timeout > System.currentTimeMillis() && queueMemorySize.get()
< maxSize && pageIterator.hasNext())
{
@@ -1786,14 +1852,26 @@
pageIterator.remove();
}
- if (isTrace)
+ if (log.isDebugEnabled())
{
if (depaged == 0 && queueMemorySize.get() >= maxSize)
{
- log.trace("Couldn't depage any message as the maxSize on the queue
was achieved. There are too many pending messages to be acked in reference to the page
configuration");
+ log.debug("Couldn't depage any message as the maxSize on the queue
was achieved. " + "There are too many pending messages to be acked in reference
to the page configuration");
}
-
- log.trace("Queue Memory Size after depage on queue="+this.getName() +
" is " + queueMemorySize.get() + " with maxSize = " + maxSize +
". Depaged " + depaged + " messages");
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Queue Memory Size after depage on queue=" +
this.getName() +
+ " is " +
+ queueMemorySize.get() +
+ " with maxSize = " +
+ maxSize +
+ ". Depaged " +
+ depaged +
+ " messages, pendingDelivery=" + messageReferences.size()
+ ", intermediateMessageReferences= " + intermediateMessageReferences.size() +
+ ", queueDelivering=" + deliveringCount.get());
+
+ }
}
deliverAsync();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -66,13 +66,6 @@
// Static
---------------------------------------------------------------------------------------
- private static final boolean trace = ServerConsumerImpl.log.isTraceEnabled();
-
- private static void trace(final String message)
- {
- ServerConsumerImpl.log.trace(message);
- }
-
// Attributes
-----------------------------------------------------------------------------------
private final long id;
@@ -92,6 +85,11 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
+
+ public String debug()
+ {
+ return toString() + "::Delivering " + this.deliveringRefs.size();
+ }
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets
or being started/stopeed by the session.
@@ -212,6 +210,11 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isDebugEnabled() )
+ {
+ log.debug(this + " is busy for the lack of credits!!!");
+ }
+
return HandleStatus.BUSY;
}
@@ -507,9 +510,9 @@
{
int previous = availableCredits.getAndAdd(credits);
- if (ServerConsumerImpl.trace)
+ if (log.isDebugEnabled())
{
- ServerConsumerImpl.trace("Received " + credits +
+ log.debug(this + "::Received " + credits +
" credits, previous value = " +
previous +
" currentValue = " +
@@ -518,6 +521,10 @@
if (previous <= 0 && previous + credits > 0)
{
+ if (log.isTraceEnabled() )
+ {
+ log.trace(this + "::calling promptDelivery from receiving
credits");
+ }
promptDelivery();
}
}
@@ -793,9 +800,9 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Leaving loop of
send LargeMessage because of credits");
+ log.trace("deliverLargeMessage: Leaving loop of send
LargeMessage because of credits");
}
return false;
@@ -818,9 +825,9 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Sending " +
packetSize +
+ log.trace("deliverLargeMessage: Sending " + packetSize +
" availableCredits now is " +
availableCredits);
}
@@ -840,9 +847,9 @@
}
}
- if (ServerConsumerImpl.trace)
+ if (ServerConsumerImpl.isTrace)
{
- ServerConsumerImpl.trace("Finished deliverLargeMessage");
+ log.trace("Finished deliverLargeMessage");
}
finish();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/integration/twitter/impl/OutgoingTweetsHandler.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -54,6 +54,12 @@
private Filter filter = null;
private boolean isStarted = false;
+
+
+ public String debug()
+ {
+ return toString();
+ }
public OutgoingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/paging/MultipleConsumersPageStressTest.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -0,0 +1,506 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.paging;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A MultipleConsumersPageStressTest
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class MultipleConsumersPageStressTest extends ServiceTestBase
+{
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final static int TIME_TO_RUN = 60 * 1000;
+
+ private static final SimpleString ADDRESS = new SimpleString("page-adr");
+
+ private int numberOfProducers;
+
+ private int numberOfConsumers;
+
+ private QueueImpl pagedServerQueue;
+
+ private boolean shareConnectionFactory = true;
+
+ private boolean openConsumerOnEveryLoop = true;
+
+ private HornetQServer messagingService;
+
+ private ServerLocator sharedLocator;
+
+ private ClientSessionFactory sharedSf;
+
+ final AtomicInteger messagesAvailable = new AtomicInteger(0);
+
+ private volatile boolean runningProducer = true;
+
+ private volatile boolean runningConsumer = true;
+
+ ArrayList<TestProducer> producers = new ArrayList<TestProducer>();
+
+ ArrayList<TestConsumer> consumers = new ArrayList<TestConsumer>();
+
+ ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
+
+ public void testOpenConsumerEveryTimeDefaultFlowControl0() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = true;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+ sharedLocator.setConsumerWindowSize(0);
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ internalMultipleConsumers();
+ }
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ Configuration config = createDefaultConfig();
+
+ HashMap<String, AddressSettings> settings = new HashMap<String,
AddressSettings>();
+
+ // messagingService = createServer(true, config, 10024, 20024, settings);
+ messagingService = createServer(true, config, 10024, 200024, settings);
+ messagingService.start();
+
+ pagedServerQueue = (QueueImpl)messagingService.createQueue(ADDRESS, ADDRESS, null,
true, false);
+
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ for (Tester tst : producers)
+ {
+ tst.close();
+ }
+ for (Tester tst : consumers)
+ {
+ tst.close();
+ }
+ sharedSf.close();
+ sharedLocator.close();
+ messagingService.stop();
+ super.tearDown();
+ }
+
+ public void testOpenConsumerEveryTimeDefaultFlowControl() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = true;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ System.out.println(pagedServerQueue.debug());
+
+ internalMultipleConsumers();
+
+ }
+
+ public void testReuseConsumersFlowControl0() throws Throwable
+ {
+ shareConnectionFactory = true;
+ openConsumerOnEveryLoop = false;
+ numberOfProducers = 1;
+ numberOfConsumers = 1;
+
+ sharedLocator = createInVMNonHALocator();
+ sharedLocator.setConsumerWindowSize(0);
+
+ sharedSf = sharedLocator.createSessionFactory();
+
+ try
+ {
+ internalMultipleConsumers();
+ }
+ catch (Throwable e)
+ {
+ TestConsumer tstConsumer = consumers.get(0);
+ System.out.println("first retry: " +
tstConsumer.consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+ pagedServerQueue.forceDelivery();
+ System.out.println("Second retry: " +
tstConsumer.consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+
+ tstConsumer.session.commit();
+ System.out.println("Third retry:" +
tstConsumer.consumer.receive(1000));
+
+ tstConsumer.close();
+
+ ClientSession session = sharedSf.createSession();
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ pagedServerQueue.forceDelivery();
+
+ System.out.println("Fourth retry: " + consumer.receive(1000));
+
+ System.out.println(pagedServerQueue.debug());
+
+ throw e;
+ }
+
+ }
+
+ public void internalMultipleConsumers() throws Throwable
+ {
+ for (int i = 0; i < numberOfProducers; i++)
+ {
+ producers.add(new TestProducer());
+ }
+
+ for (int i = 0; i < numberOfConsumers; i++)
+ {
+ consumers.add(new TestConsumer());
+ }
+
+ for (Tester test : producers)
+ {
+ test.start();
+ }
+
+ Thread.sleep(2000);
+
+ for (Tester test : consumers)
+ {
+ test.start();
+ }
+
+ for (Tester test : consumers)
+ {
+ test.join();
+ }
+
+ runningProducer = false;
+
+ for (Tester test : producers)
+ {
+ test.join();
+ }
+
+ for (Throwable e : exceptions)
+ {
+ throw e;
+ }
+
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ abstract class Tester extends Thread
+ {
+ Random random = new Random();
+
+ public abstract void close();
+
+ protected abstract boolean enabled();
+
+ protected void exceptionHappened(final Throwable e)
+ {
+ runningConsumer = false;
+ runningProducer = false;
+ e.printStackTrace();
+ exceptions.add(e);
+ }
+
+ public int getNumberOfMessages() throws Exception
+ {
+ int numberOfMessages = random.nextInt(20);
+ if (numberOfMessages <= 0)
+ {
+ return 1;
+ }
+ else
+ {
+ return numberOfMessages;
+ }
+ }
+ }
+
+ class TestConsumer extends Tester
+ {
+
+ public ClientConsumer consumer = null;
+
+ public ClientSession session = null;
+
+ public ServerLocator locator = null;
+
+ public ClientSessionFactory sf = null;
+
+ @Override
+ public void close()
+ {
+ try
+ {
+
+ if (!openConsumerOnEveryLoop)
+ {
+ consumer.close();
+ }
+ session.rollback();
+ session.close();
+
+ if (!shareConnectionFactory)
+ {
+ sf.close();
+ locator.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ @Override
+ protected boolean enabled()
+ {
+ return runningConsumer;
+ }
+
+ @Override
+ public int getNumberOfMessages() throws Exception
+ {
+ while (enabled())
+ {
+ int numberOfMessages = super.getNumberOfMessages();
+
+ int resultMessages = messagesAvailable.addAndGet(-numberOfMessages);
+
+ if (resultMessages < 0)
+ {
+ messagesAvailable.addAndGet(-numberOfMessages);
+ numberOfMessages = 0;
+ System.out.println("Negative, giving a little wait");
+ Thread.sleep(1000);
+ }
+
+ if (numberOfMessages > 0)
+ {
+ return numberOfMessages;
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (shareConnectionFactory)
+ {
+ session = sharedSf.createSession(false, false);
+ }
+ else
+ {
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ }
+
+ long timeOut = System.currentTimeMillis() +
MultipleConsumersPageStressTest.TIME_TO_RUN;
+
+ session.start();
+
+ if (!openConsumerOnEveryLoop)
+ {
+ consumer =
session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+ }
+
+ int count = 0;
+
+ while (enabled() && timeOut > System.currentTimeMillis())
+ {
+
+ if (openConsumerOnEveryLoop)
+ {
+ consumer =
session.createConsumer(MultipleConsumersPageStressTest.ADDRESS);
+ }
+
+ int numberOfMessages = getNumberOfMessages();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+ if (msg == null)
+ {
+ log.warn("msg " + count +
+ " was null, currentBatchSize=" +
+ numberOfMessages +
+ ", current msg being read=" +
+ i);
+ }
+ Assert.assertNotNull("msg " + count +
+ " was null, currentBatchSize=" +
+ numberOfMessages +
+ ", current msg being read=" +
+ i, msg);
+
+ if (numberOfConsumers == 1 && numberOfProducers == 1)
+ {
+ Assert.assertEquals(count,
msg.getIntProperty("count").intValue());
+ }
+
+ count++;
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ if (openConsumerOnEveryLoop)
+ {
+ consumer.close();
+ }
+
+ }
+ }
+ catch (Throwable e)
+ {
+ exceptionHappened(e);
+ }
+
+ }
+ }
+
+ class TestProducer extends Tester
+ {
+ ClientSession session = null;
+
+ ClientSessionFactory sf = null;
+
+ ServerLocator locator = null;
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ session.rollback();
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ @Override
+ protected boolean enabled()
+ {
+ return runningProducer;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ if (shareConnectionFactory)
+ {
+ session = sharedSf.createSession(false, false);
+ }
+ else
+ {
+ locator = createInVMNonHALocator();
+ sf = locator.createSessionFactory();
+ session = sf.createSession(false, false);
+ }
+
+ ClientProducer prod =
session.createProducer(MultipleConsumersPageStressTest.ADDRESS);
+
+ int count = 0;
+
+ while (enabled())
+ {
+ int numberOfMessages = getNumberOfMessages();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putStringProperty("Test", "This is a simple
test");
+ msg.putIntProperty("count", count++);
+ prod.send(msg);
+ }
+
+ messagesAvailable.addAndGet(numberOfMessages);
+ session.commit();
+ }
+ }
+ catch (Throwable e)
+ {
+ exceptionHappened(e);
+ }
+ }
+ }
+
+}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java 2011-09-09
15:07:29 UTC (rev 11311)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeConsumer.java 2011-09-09
15:56:27 UTC (rev 11312)
@@ -55,6 +55,11 @@
return filter;
}
+ public String debug()
+ {
+ return toString();
+ }
+
public synchronized MessageReference waitForNextReference(long timeout)
{
while (references.isEmpty() && timeout > 0)