Author: clebert.suconic(a)jboss.com
Date: 2011-12-10 20:58:20 -0500 (Sat, 10 Dec 2011)
New Revision: 11894
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
JBPAPP-7710 - Back porting fixed from 2.2.8 into 2.2.5 _JBPAPP_7242 as the Customer is
having issues
Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
(rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh 2011-12-11 01:58:20
UTC (rev 11894)
@@ -0,0 +1,2 @@
+#you need to define this variable on mac
+./build.sh -Djdk5.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home/
"$@"
Property changes on: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/mac-build.sh
___________________________________________________________________
Added: svn:executable
+ *
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -400,7 +400,7 @@
{
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<Map<String,
Object>>();
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
LinkedListIterator<MessageReference> iterator = queue.iterator();
try
{
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -107,10 +107,10 @@
public List<PagedMessage> read(StorageManager storage) throws Exception
{
- if (isDebug)
- {
- log.debug("reading page " + this.pageId + " on address = " +
storeName);
- }
+ if (isDebug)
+ {
+ log.debug("reading page " + this.pageId + " on address = " +
storeName);
+ }
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
@@ -212,7 +212,10 @@
public void open() throws Exception
{
- file.open();
+ if (!file.isOpen())
+ {
+ file.open();
+ }
size.set((int)file.size());
file.position(0);
}
@@ -307,6 +310,21 @@
{
return otherPage.getPageId() - this.pageId;
}
+
+ public void finalize()
+ {
+ try
+ {
+ if (file != null && file.isOpen())
+ {
+ file.close();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
/* (non-Javadoc)
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -730,6 +730,12 @@
{
return notificationLock;
}
+
+ // For tests
+ public AddressManager getAddressManager()
+ {
+ return addressManager;
+ }
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString
address) throws Exception
{
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/Queue.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -167,7 +167,7 @@
void resetAllIterators();
- boolean blockOnExecutorFuture();
+ boolean flushExecutor();
void close() throws Exception;
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -24,11 +24,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -428,7 +431,7 @@
{
// We must block on the executor to ensure any async deliveries have
completed or we might get out of order
// deliveries
- if (blockOnExecutorFuture())
+ if (flushExecutor())
{
// Go into direct delivery mode
directDeliver = true;
@@ -449,7 +452,7 @@
directDeliver = false;
- executor.execute(concurrentPoller);
+ getExecutor().execute(concurrentPoller);
}
public void forceDelivery()
@@ -458,14 +461,14 @@
{
if (isTrace)
{
- log.trace("Force delivery scheduling depage");
+ log.trace("Force delivery scheduling depage");
}
scheduleDepage();
}
if (isTrace)
{
- log.trace("Force delivery deliverying async");
+ log.trace("Force delivery deliverying async");
}
deliverAsync();
@@ -473,7 +476,13 @@
public void deliverAsync()
{
- getExecutor().execute(deliverRunner);
+ try
+ {
+ getExecutor().execute(deliverRunner);
+ }
+ catch (RejectedExecutionException ignored)
+ {
+ }
}
public void close() throws Exception
@@ -483,7 +492,20 @@
checkQueueSizeFuture.cancel(false);
}
- cancelRedistributor();
+ getExecutor().execute(new Runnable(){
+ public void run()
+ {
+ try
+ {
+ cancelRedistributor();
+ }
+ catch (Exception e)
+ {
+ // nothing that could be done anyway.. just logging
+ log.warn(e.getMessage(), e);
+ }
+ }
+ });
}
public Executor getExecutor()
@@ -504,14 +526,14 @@
{
deliverAsync();
- blockOnExecutorFuture();
+ flushExecutor();
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
Future future = new Future();
- executor.execute(future);
+ getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -780,9 +802,31 @@
public long getMessageCount()
{
- blockOnExecutorFuture();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicLong count = new AtomicLong(0);
- return getInstantMessageCount();
+ getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ count.set(getInstantMessageCount());
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Timed out on waiting for
MessageCount");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ return count.get();
}
public long getInstantMessageCount()
@@ -907,6 +951,7 @@
public synchronized void cancel(final MessageReference reference, final long timeBase)
throws Exception
{
+ deliveringCount.decrementAndGet();
if (checkRedelivery(reference, timeBase))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
@@ -924,7 +969,7 @@
{
if (isTrace)
{
- log.trace("moving expired reference " + ref + " to address =
" + expiryAddress + " from queue=" + this.getName(), new Exception
("trace"));
+ log.trace("moving expired reference " + ref + " to address =
" + expiryAddress + " from queue=" + this.getName());
}
move(expiryAddress, ref, true, false);
}
@@ -950,9 +995,31 @@
public long getMessagesAdded()
{
- blockOnExecutorFuture();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicLong count = new AtomicLong(0);
- return getInstantMessagesAdded();
+ getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ count.set(getInstantMessagesAdded());
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Timed out on waiting for
MessagesAdded");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ return count.get();
}
public long getInstantMessagesAdded()
@@ -1137,28 +1204,43 @@
}
}
- public synchronized void expireReferences() throws Exception
+ public void expireReferences() throws Exception
{
- LinkedListIterator<MessageReference> iter = iterator();
-
- try
- {
- while (iter.hasNext())
+ getExecutor().execute(new Runnable(){
+ public void run()
{
- MessageReference ref = iter.next();
- if (ref.getMessage().isExpired())
+ synchronized (QueueImpl.this)
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
+ LinkedListIterator<MessageReference> iter = iterator();
+
+ try
+ {
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ try
+ {
+ if (ref.getMessage().isExpired())
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Error expiring reference " + ref, e);
+ }
+ }
+ }
+ finally
+ {
+ iter.close();
+ }
}
}
- }
- finally
- {
- iter.close();
- }
+ });
}
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID)
throws Exception
@@ -1486,7 +1568,7 @@
@Override
public String toString()
{
- return "QueueImpl[name=" + name.toString() + "]@" +
Integer.toHexString(System.identityHashCode(this));
+ return "QueueImpl[name=" + name.toString() + ", postOffice=" +
this.postOffice + "]@" + Integer.toHexString(System.identityHashCode(this));
}
// Private
@@ -1530,166 +1612,168 @@
// This method will deliver as many messages as possible until all consumers are busy
or there are no more matching
// or available messages
- private synchronized void deliver()
+ private void deliver()
{
- if (paused || consumerList.isEmpty())
+ synchronized (this)
{
- return;
- }
-
- if (log.isDebugEnabled())
- {
- log.debug(this + " doing deliver. messageReferences=" +
messageReferences.size());
- }
+ if (paused || consumerList.isEmpty())
+ {
+ return;
+ }
- int busyCount = 0;
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " doing deliver. messageReferences=" +
messageReferences.size());
+ }
- int nullRefCount = 0;
+ int busyCount = 0;
- int size = consumerList.size();
+ int nullRefCount = 0;
- int endPos = pos == size - 1 ? 0 : size - 1;
+ int size = consumerList.size();
- int numRefs = messageReferences.size();
+ int endPos = pos == size - 1 ? 0 : size - 1;
- int handled = 0;
-
- long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
+ int numRefs = messageReferences.size();
- while (handled < numRefs)
- {
- if (handled == MAX_DELIVERIES_IN_LOOP)
- {
- // Schedule another one - we do this to prevent a single thread getting
caught up in this loop for too long
+ int handled = 0;
- deliverAsync();
+ long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
- return;
- }
-
- if (System.currentTimeMillis() > timeout)
+ while (handled < numRefs)
{
- if (isTrace)
+ if (handled == MAX_DELIVERIES_IN_LOOP)
{
- log.trace("delivery has been running for too long. Scheduling another
delivery task now");
- }
-
- deliverAsync();
-
- return;
- }
-
+ // Schedule another one - we do this to prevent a single thread getting
caught up in this loop for too
+ // long
- ConsumerHolder holder = consumerList.get(pos);
+ deliverAsync();
- Consumer consumer = holder.consumer;
+ return;
+ }
- if (holder.iter == null)
- {
- holder.iter = messageReferences.iterator();
- }
-
- MessageReference ref;
-
- if (holder.iter.hasNext())
- {
- ref = holder.iter.next();
- }
- else
- {
- ref = null;
- }
-
-
- if (ref == null)
- {
- nullRefCount++;
- }
- else
- {
- if (checkExpired(ref))
+ if (System.currentTimeMillis() > timeout)
{
if (isTrace)
{
- log.trace("Reference " + ref + " being expired");
+ log.trace("delivery has been running for too long. Scheduling
another delivery task now");
}
- holder.iter.remove();
- refRemoved(ref);
-
- handled++;
+ deliverAsync();
- continue;
+ return;
}
- Consumer groupConsumer = null;
-
- if (isTrace)
+ ConsumerHolder holder = consumerList.get(pos);
+
+ Consumer consumer = holder.consumer;
+
+ if (holder.iter == null)
{
- log.trace("Queue " + this.getName() + " is delivering
reference " + ref);
+ holder.iter = messageReferences.iterator();
}
- // If a group id is set, then this overrides the consumer chosen round-robin
+ MessageReference ref;
- SimpleString groupID =
ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ if (holder.iter.hasNext())
+ {
+ ref = holder.iter.next();
+ }
+ else
+ {
+ ref = null;
+ }
- if (groupID != null)
+ if (ref == null)
{
- groupConsumer = groups.get(groupID);
+ nullRefCount++;
+ }
+ else
+ {
+ if (checkExpired(ref))
+ {
+ if (isTrace)
+ {
+ log.trace("Reference " + ref + " being
expired");
+ }
+ holder.iter.remove();
- if (groupConsumer != null)
+ refRemoved(ref);
+
+ handled++;
+
+ continue;
+ }
+
+ Consumer groupConsumer = null;
+
+ if (isTrace)
{
- consumer = groupConsumer;
+ log.trace("Queue " + this.getName() + " is delivering
reference " + ref);
}
- }
- HandleStatus status = handle(ref, consumer);
+ // If a group id is set, then this overrides the consumer chosen
round-robin
- if (status == HandleStatus.HANDLED)
- {
- holder.iter.remove();
+ SimpleString groupID =
ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
- refRemoved(ref);
+ if (groupID != null)
+ {
+ groupConsumer = groups.get(groupID);
- if (groupID != null && groupConsumer == null)
+ if (groupConsumer != null)
+ {
+ consumer = groupConsumer;
+ }
+ }
+
+ HandleStatus status = handle(ref, consumer);
+
+ if (status == HandleStatus.HANDLED)
{
- groups.put(groupID, consumer);
+ holder.iter.remove();
+
+ refRemoved(ref);
+
+ if (groupID != null && groupConsumer == null)
+ {
+ groups.put(groupID, consumer);
+ }
+
+ handled++;
}
+ else if (status == HandleStatus.BUSY)
+ {
+ holder.iter.repeat();
- handled++;
+ busyCount++;
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ }
}
- else if (status == HandleStatus.BUSY)
- {
- holder.iter.repeat();
- busyCount++;
- }
- else if (status == HandleStatus.NO_MATCH)
+ if (pos == endPos)
{
- }
- }
+ // Round robin'd all
- if (pos == endPos)
- {
- // Round robin'd all
-
- if (nullRefCount + busyCount == size)
- {
- if (log.isDebugEnabled())
+ if (nullRefCount + busyCount == size)
{
- log.debug(this + "::All the consumers were busy, giving up
now");
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::All the consumers were busy, giving up
now");
+ }
+ break;
}
- break;
+
+ nullRefCount = busyCount = 0;
}
- nullRefCount = busyCount = 0;
- }
+ pos++;
- pos++;
-
- if (pos == size)
- {
- pos = 0;
+ if (pos == size)
+ {
+ pos = 0;
+ }
}
}
@@ -1735,7 +1819,7 @@
}
}
- private synchronized void depage()
+ private void depage()
{
depagePending = false;
@@ -1860,7 +1944,7 @@
}
reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
- if (message.isDurable() && durable)
+ if (!reference.isPaged() && message.isDurable() && durable)
{
storageManager.updateScheduledDeliveryTime(reference);
}
@@ -2173,7 +2257,7 @@
}
catch (Exception e)
{
- QueueImpl.log.warn("Unable to decrement reference counting", e);
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
}
}
@@ -2244,6 +2328,10 @@
for (MessageReference ref : refsToAck)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace("rolling back " + ref);
+ }
try
{
if (ref.getQueue().checkRedelivery(ref, timeBase))
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -61,7 +62,7 @@
// Constants
------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static
---------------------------------------------------------------------------------------
@@ -85,14 +86,12 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
}
- private boolean largeMessageInDelivery;
-
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets
or being started/stopeed by the session.
*/
@@ -117,7 +116,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control
(assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want
to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -165,11 +164,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -187,7 +186,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -197,12 +196,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -212,20 +211,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits =
" +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO -
https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO -
https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -238,11 +240,23 @@
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the
lock
- if (largeMessageInDelivery)
+ if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
+ if (log.isTraceEnabled())
+ {
+ log.trace("Handling reference " + ref);
+ }
+
final ServerMessage message = ref.getMessage();
if (filter != null && !filter.match(message))
@@ -265,7 +279,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&& !ref.getQueue().isInternalQueue())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable()
&&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -306,7 +322,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -352,8 +368,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME,
binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null
? null
- :
filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null :
filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -374,39 +390,71 @@
{
promptDelivery();
- Future future = new Future();
+ // JBPAPP-6030 - Using the executor to avoid distributed dead locks
+ messageQueue.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ // We execute this on the same executor to make sure the force delivery
message is written after
+ // any delivery is completed
- messageQueue.getExecutor().execute(future);
+ synchronized (lock)
+ {
+ if (transferring)
+ {
+ // Case it's transferring (reattach), we will retry later
+ messageQueue.getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ forceDelivery(sequence);
+ }
+ });
+ }
+ else
+ {
+ ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
+
+
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
+ forcedDeliveryMessage.setAddress(messageQueue.getName());
+
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ ServerConsumerImpl.log.error("Failed to send forced delivery
message", e);
+ }
+ }
+ });
- boolean ok = future.await(10000);
+ }
- if (!ok)
- {
- log.warn("Timed out waiting for executor");
- }
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
+ {
+ boolean performACK = lastConsumedAsDelivered;
try
{
- // We execute this on the same executor to make sure the force delivery message
is written after
- // any delivery is completed
-
- ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(), 50);
-
-
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
- forcedDeliveryMessage.setAddress(messageQueue.getName());
-
- callback.sendMessage(forcedDeliveryMessage, id, 0);
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
}
- catch (Exception e)
+ catch (Throwable e)
{
- ServerConsumerImpl.log.error("Failed to send forced delivery message",
e);
+ log.warn("Error on resetting large message deliver - " +
largeMessageDeliverer, e);
}
- }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final
boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
- {
- boolean performACK = lastConsumedAsDelivered;
-
LinkedList<MessageReference> refs = new
LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -427,8 +475,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since
there's a possibility that refs were actually delivered but we just didn't get any
acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since
there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for
them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -461,21 +510,6 @@
synchronized (lock)
{
this.transferring = transferring;
-
- if (transferring)
- {
- // Now we must wait for any large message delivery to finish
- while (largeMessageInDelivery)
- {
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
}
// Outside the lock
@@ -504,18 +538,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control
message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control
message");
availableCredits.set(0);
}
else
@@ -524,16 +563,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving
credits");
}
@@ -547,59 +587,103 @@
return messageQueue;
}
- public void acknowledge(final boolean autoCommitAcks, final Transaction tx, final long
messageID) throws Exception
+ public void acknowledge(final boolean autoCommitAcks, Transaction tx, final long
messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including
the one explicitly
// acknowledged
-
- MessageReference ref;
- do
+
+ // We use a transaction here as if the message is not found, we should rollback
anything done
+ // This could eventually happen on retries during transactions, and we need to make
sure we don't ACK things we are not supposed to acknowledge
+
+ boolean startedTransaction = false;
+
+ if (tx == null || autoCommitAcks)
{
- ref = deliveringRefs.poll();
-
- if (ref == null)
+ startedTransaction = true;
+ tx = new TransactionImpl(storageManager);
+ }
+
+ try
+ {
+
+ MessageReference ref;
+ do
{
- throw new IllegalStateException(System.identityHashCode(this) + " Could
not find reference on consumerID=" +
- id +
- ", messageId = " +
- messageID +
- " queue = " +
- messageQueue.getName() +
- " closed = " +
- closed);
+ ref = deliveringRefs.poll();
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("ACKing ref " + ref + " on " + this);
+ }
+
+ if (ref == null)
+ {
+
+ HornetQException e = new HornetQException(HornetQException.ILLEGAL_STATE,
"Could not find reference on consumerID=" +
+ id +
+ ", messageId = " +
+ messageID +
+ " queue = " +
+ messageQueue.getName());
+ throw e;
+ }
+
+ ref.getQueue().acknowledge(tx, ref);
}
-
- if (autoCommitAcks || tx == null)
+ while (ref.getMessage().getMessageID() != messageID);
+
+ if (startedTransaction)
{
- ref.getQueue().acknowledge(ref);
+ tx.commit();
}
+ }
+ catch (HornetQException e)
+ {
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
else
{
- ref.getQueue().acknowledge(tx, ref);
+ tx.markAsRollbackOnly(e);
}
+ throw e;
}
- while (ref.getMessage().getMessageID() != messageID);
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ HornetQException hqex = new HornetQException(HornetQException.ILLEGAL_STATE,
e.getMessage());
+ if (startedTransaction)
+ {
+ tx.rollback();
+ }
+ else
+ {
+ tx.markAsRollbackOnly(hqex);
+ }
+ throw hqex;
+ }
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx,
final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " +
messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -639,13 +723,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -664,28 +748,30 @@
private void promptDelivery()
{
- synchronized (lock)
+ // largeMessageDeliverer is aways set inside a lock
+ // if we don't acquire a lock, we will have NPE eventually
+ if (largeMessageDeliverer != null)
{
- // largeMessageDeliverer is aways set inside a lock
- // if we don't acquire a lock, we will have NPE eventually
- if (largeMessageDeliverer != null)
- {
- resumeLargeMessage();
- }
- else
- {
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- messageQueue.forceDelivery();
- }
- }
+ resumeLargeMessage();
}
+ else
+ {
+ forceDelivery();
+ }
}
+ private void forceDelivery()
+ {
+ if (browseOnly)
+ {
+ messageQueue.getExecutor().execute(browserDeliverer);
+ }
+ else
+ {
+ messageQueue.deliverAsync();
+ }
+ }
+
private void resumeLargeMessage()
{
messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
@@ -693,8 +779,6 @@
private void deliverLargeMessage(final MessageReference ref, final ServerMessage
message) throws Exception
{
- largeMessageInDelivery = true;
-
final LargeMessageDeliverer localDeliverer = new
LargeMessageDeliverer((LargeServerMessage)message, ref);
// it doesn't need lock because deliverLargeMesasge is already inside the
lock()
@@ -713,6 +797,14 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
@@ -729,16 +821,7 @@
{
if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
{
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- // prompt Delivery only if chunk was finished
-
- messageQueue.deliverAsync();
- }
+ forceDelivery();
}
}
catch (Exception e)
@@ -786,6 +869,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage
interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -794,7 +883,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -807,6 +896,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not
to tie up the delivery thread
@@ -822,7 +920,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send
LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving
loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -845,16 +944,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver
continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -903,8 +1003,6 @@
largeMessageDeliverer = null;
- largeMessageInDelivery = false;
-
largeMessage = null;
}
}
@@ -920,7 +1018,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java
(rev 0)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/MultipleConsumerTest.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -0,0 +1,460 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.core.postoffice.AddressManager;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.QueueBinding;
+import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * A MultipleConsumerTest
+ *
+ * @author clebert
+ *
+ *
+ */
+public class MultipleConsumerTest extends JMSTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private static final int TIMEOUT_ON_WAIT = 5000;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+ volatile boolean running = true;
+
+ private static final long WAIT_ON_SEND = 0;
+
+ CountDownLatch errorLatch = new CountDownLatch(1);
+
+ AtomicInteger numberOfErrors = new AtomicInteger(0);
+
+ public void error(Throwable e)
+ {
+ System.err.println("Error at " + Thread.currentThread().getName());
+ e.printStackTrace();
+ errorLatch.countDown();
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ public Topic createSampleTopic(int destinationID)
+ {
+ return HornetQJMSClient.createTopic(createTopicName(destinationID));
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ private String createTopicName(int destinationID)
+ {
+ return "topic-input" + destinationID;
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ public Queue createSampleQueue(int destinationID)
+ {
+ return HornetQJMSClient.createQueue(createQueueName(destinationID));
+ }
+
+ /**
+ * @param destinationID
+ * @return
+ */
+ private String createQueueName(int destinationID)
+ {
+ return "queue-output-" + destinationID;
+ }
+
+ public class Counter extends Thread
+ {
+ public Counter()
+ {
+ super("Counter-Thread-Simulating-Management");
+ }
+
+ public void run()
+ {
+ try
+ {
+ AddressManager addr =
((PostOfficeImpl)server.getPostOffice()).getAddressManager();
+
+ LinkedList<org.hornetq.core.server.Queue> queues = new
LinkedList<org.hornetq.core.server.Queue>();
+ for (Binding binding : addr.getBindings().values())
+ {
+ if (binding instanceof QueueBinding)
+ {
+ queues.add(((QueueBinding)binding).getQueue());
+ }
+ }
+
+ while (running)
+ {
+ Thread.sleep(1000);
+ for (org.hornetq.core.server.Queue q : queues)
+ {
+ System.out.println("Queue " + q +
+ " has " +
+ q.getInstantMessageCount() +
+ " with " +
+ q.getMessagesAdded() +
+ " with " +
+ q.getConsumerCount() +
+ " consumers");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // It will produce to a destination
+ public class ProducerThread extends Thread
+ {
+ Connection conn;
+
+ Session sess;
+
+ Topic topic;
+
+ MessageProducer prod;
+
+ public ProducerThread(Connection conn, int destinationID) throws Exception
+ {
+ this.conn = conn;
+ this.sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ this.topic = createSampleTopic(destinationID);
+ this.prod = sess.createProducer(topic);
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (running)
+ {
+ BytesMessage msg = sess.createBytesMessage();
+ msg.writeBytes(new byte[1024]);
+ prod.send(msg);
+ sess.commit();
+ Thread.sleep(WAIT_ON_SEND);
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // It will bridge from one subscription and send to a queue
+ public class BridgeSubscriberThread extends Thread
+ {
+ Session session;
+
+ MessageProducer prod;
+
+ MessageConsumer cons;
+
+ Topic topic;
+
+ Queue outputQueue;
+
+ public BridgeSubscriberThread(Connection masterConn, int destinationID) throws
Exception
+ {
+ super("Bridge_destination=" + destinationID);
+ topic = createSampleTopic(destinationID);
+ outputQueue = createSampleQueue(destinationID);
+ session = masterConn.createSession(true, Session.SESSION_TRANSACTED);
+ cons = session.createDurableSubscriber(topic, "bridge-on-" +
destinationID);
+
+ prod = session.createProducer(outputQueue);
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
+
+ public void run()
+ {
+ try
+ {
+
+ int i = 0;
+ while (running)
+ {
+ Message msg = cons.receive(TIMEOUT_ON_WAIT);
+
+ if (msg == null)
+ {
+ System.err.println("couldn't receive a message within
TIMEOUT_ON_WAIT miliseconds on " + topic);
+ error(new RuntimeException("Couldn't receive message"));
+ }
+ else
+ {
+ if (i++ % 100 == 0)
+ {
+ System.out.println(Thread.currentThread().getName() + "
received " + i);
+ }
+ prod.send(msg);
+ session.commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // It will read from a destination, and pretend it finished processing it
+ public class ProcessorThread extends Thread
+ {
+
+ Connection conn;
+
+ Session session;
+
+ MessageConsumer cons;
+
+ Destination dest;
+
+ final long waitOnEachConsume;
+
+ public ProcessorThread(Connection conn,
+ Session sess,
+ Destination dest,
+ MessageConsumer cons,
+ long waitOnEachConsume) throws Exception
+ {
+ super("Processor on " + dest);
+ this.conn = conn;
+ this.session = sess;
+ this.dest = dest;
+ this.cons = cons;
+ this.waitOnEachConsume = waitOnEachConsume;
+ }
+
+ public void run()
+ {
+ int i = 0;
+ try
+ {
+ while (running)
+ {
+ if (waitOnEachConsume != 0)
+ {
+ Thread.sleep(waitOnEachConsume);
+ }
+ Message msg = cons.receive(TIMEOUT_ON_WAIT);
+
+ if (i++ % 100 == 0)
+ {
+ System.out.println(Thread.currentThread().getName() + " processed
" + i);
+ }
+ if (msg == null)
+ {
+ System.err.println("couldn't receive a message on processor
within TIMEOUT_ON_WAIT miliseconds on " + dest);
+ error(new RuntimeException("Couldn't receive message"));
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ error(e);
+ }
+ }
+ }
+
+ // This test requires to be manually tested
+ // At the end this test is throwing an OME for some issue on the test itself.
+ // As long as you see the message "Finished" the test is considered
successfull!
+ public void _testMultipleConsumers() throws Throwable
+ {
+
+ AddressSettings set = new AddressSettings();
+ set.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ set.setPageSizeBytes(10 * 1024);
+ set.setMaxSizeBytes(100 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", set);
+
+ try
+ {
+ int nDestinations = 100;
+
+ for (int i = 0; i < nDestinations; i++)
+ {
+ createTopic(createTopicName(i));
+ createQueue(createQueueName(i));
+ }
+
+ LinkedList<Connection> connections = new LinkedList<Connection>();
+
+ LinkedList<Thread> consumerThreads = new LinkedList<Thread>();
+
+ LinkedList<Thread> producerThreads = new LinkedList<Thread>();
+
+ // start a few simulated external consumers on the topic (1 external
subscription)
+ for (int i = 0; i < nDestinations; i++)
+ {
+ Connection conn = cf.createConnection();
+ conn.setClientID("external-consumer-" + i);
+ conn.start();
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = createSampleTopic(i);
+ MessageConsumer cons = sess.createDurableSubscriber(topic, "ex-" +
i);
+ ProcessorThread proc = new ProcessorThread(conn, sess, topic, cons, 100l);
+ consumerThreads.add(proc);
+
+ connections.add(conn);
+ }
+
+ // uncomment this to read from the output queues
+ for (int i = 0; i < nDestinations; i++)
+ {
+ Connection conncons = cf.createConnection();
+ conncons.setClientID("output-queue" + i);
+ conncons.start();
+ Session sesscons = conncons.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = createSampleQueue(i);
+ MessageConsumer cons = sesscons.createConsumer(queue);
+ ProcessorThread proc = new ProcessorThread(conncons, sesscons, queue, cons,
0l);
+ consumerThreads.add(proc);
+ connections.add(conncons);
+ }
+
+ Connection masterConn = cf.createConnection();
+ connections.add(masterConn);
+ masterConn.setClientID("master-conn");
+ masterConn.start();
+
+ // start the bridges itself
+ for (int i = 0; i < nDestinations; i++)
+ {
+ BridgeSubscriberThread subs = new BridgeSubscriberThread(masterConn, i);
+ consumerThreads.add(subs);
+ }
+
+ // The producers
+ for (int i = 0; i < nDestinations; i++)
+ {
+ Connection prodConn = cf.createConnection();
+ ProducerThread prod = new ProducerThread(prodConn, i);
+ producerThreads.add(prod);
+ }
+
+ for (Thread t : producerThreads)
+ {
+ t.start();
+ }
+
+ // Waiting some time before we start the consumers. To make sure it's
paging
+ Thread.sleep(20000);
+
+ System.out.println("starting consumers now");
+
+ for (Thread t : consumerThreads)
+ {
+ t.start();
+ }
+
+ Counter managerThread = new Counter();
+
+ managerThread.start();
+
+ errorLatch.await(20, TimeUnit.MINUTES);
+
+ assertEquals(0, numberOfErrors.get());
+
+ running = false;
+
+ for (Thread t : consumerThreads)
+ {
+ t.join();
+ }
+
+ for (Thread t : producerThreads)
+ {
+ t.join();
+ }
+
+ for (Connection conn : connections)
+ {
+ conn.close();
+ }
+
+ managerThread.join();
+
+ System.out.println("Finished!!!!");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -134,7 +134,7 @@
prod.send(msg);
}
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
//Consumer is not started so should go queued
assertFalse(queue.isDirectDeliver());
@@ -157,7 +157,7 @@
prod.send(msg);
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
assertTrue(queue.isDirectDeliver());
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -82,7 +82,7 @@
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
return true;
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -617,7 +617,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#blockOnExecutorFuture()
*/
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
// TODO Auto-generated method stub
return false;
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-12-11
01:43:04 UTC (rev 11893)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/util/JMSTestBase.java 2011-12-11
01:58:20 UTC (rev 11894)
@@ -214,7 +214,7 @@
HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES,
HornetQClient.DEFAULT_CONSUMER_WINDOW_SIZE,
HornetQClient.DEFAULT_CONSUMER_MAX_RATE,
- HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ 100 * 1024, //
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE,
HornetQClient.DEFAULT_PRODUCER_WINDOW_SIZE,
HornetQClient.DEFAULT_PRODUCER_MAX_RATE,
HornetQClient.DEFAULT_BLOCK_ON_ACKNOWLEDGE,