[hornetq-commits] JBoss hornetq SVN: r10205 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Feb 14 20:04:22 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-14 20:04:21 -0500 (Mon, 14 Feb 2011)
New Revision: 10205
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://issues.jboss.org/browse/JBPAPP-5926 - fixing scheduled delivery order of delivery
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-15 01:04:21 UTC (rev 10205)
@@ -129,7 +129,7 @@
Collection<Consumer> getConsumers();
- boolean checkDLQ(MessageReference ref) throws Exception;
+ boolean checkRedelivery(MessageReference ref) throws Exception;
LinkedListIterator<MessageReference> iterator();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-15 01:04:21 UTC (rev 10205)
@@ -808,7 +808,7 @@
public synchronized void cancel(final MessageReference reference) throws Exception
{
- if (checkDLQ(reference))
+ if (checkRedelivery(reference))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference))
{
@@ -1275,6 +1275,10 @@
{
for (ConsumerHolder holder : this.consumerList)
{
+ if (holder.iter != null)
+ {
+ holder.iter.close();
+ }
holder.iter = null;
}
}
@@ -1571,11 +1575,12 @@
}
}
- public boolean checkDLQ(final MessageReference reference) throws Exception
+ public boolean checkRedelivery(final MessageReference reference) throws Exception
{
ServerMessage message = reference.getMessage();
// TODO: DeliveryCount on paging
+
if (message.isDurable() && durable && !reference.isPaged())
{
storageManager.updateDeliveryCount(reference);
@@ -1585,6 +1590,7 @@
int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
+ // First check DLA
if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
{
sendToDeadLetterAddress(reference);
@@ -1593,13 +1599,17 @@
}
else
{
+ // Second check Redelivery Delay
long redeliveryDelay = addressSettings.getRedeliveryDelay();
if (redeliveryDelay > 0)
{
reference.setScheduledDeliveryTime(System.currentTimeMillis() + redeliveryDelay);
-
- storageManager.updateScheduledDeliveryTime(reference);
+
+ if (message.isDurable() && durable)
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ }
}
deliveringCount.decrementAndGet();
@@ -1982,7 +1992,7 @@
{
try
{
- if (ref.getQueue().checkDLQ(ref))
+ if (ref.getQueue().checkRedelivery(ref))
{
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-02-15 01:04:21 UTC (rev 10205)
@@ -13,9 +13,10 @@
package org.hornetq.core.server.impl;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -23,6 +24,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ScheduledDeliveryHandler;
/**
@@ -40,9 +42,9 @@
private static final boolean trace = ScheduledDeliveryHandlerImpl.log.isTraceEnabled();
private final ScheduledExecutorService scheduledExecutor;
+
+ private LinkedList<MessageReference> scheduledReferences = new LinkedList<MessageReference>();
- private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();
-
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
{
this.scheduledExecutor = scheduledExecutor;
@@ -52,18 +54,18 @@
{
long deliveryTime = ref.getScheduledDeliveryTime();
- if (deliveryTime > System.currentTimeMillis() && scheduledExecutor != null)
+ if (deliveryTime > 0 && scheduledExecutor != null)
{
if (ScheduledDeliveryHandlerImpl.trace)
{
ScheduledDeliveryHandlerImpl.log.trace("Scheduling delivery for " + ref + " to occur at " + deliveryTime);
}
- ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref);
+ ScheduledDeliveryRunnable runnable = new ScheduledDeliveryRunnable(ref.getScheduledDeliveryTime());
- synchronized (scheduledRunnables)
+ synchronized (scheduledReferences)
{
- scheduledRunnables.put(ref.getMessage().getMessageID(), runnable);
+ scheduledReferences.add(ref);
}
scheduleDelivery(runnable, deliveryTime);
@@ -75,19 +77,19 @@
public int getScheduledCount()
{
- return scheduledRunnables.size();
+ synchronized (scheduledReferences)
+ {
+ return scheduledReferences.size();
+ }
}
public List<MessageReference> getScheduledReferences()
{
List<MessageReference> refs = new ArrayList<MessageReference>();
- synchronized (scheduledRunnables)
+ synchronized (scheduledReferences)
{
- for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables.values())
- {
- refs.add(scheduledRunnable.getReference());
- }
+ refs.addAll(scheduledReferences);
}
return refs;
}
@@ -96,40 +98,40 @@
{
List<MessageReference> refs = new ArrayList<MessageReference>();
- synchronized (scheduledRunnables)
+ synchronized (scheduledReferences)
{
- Map<Long, ScheduledDeliveryRunnable> copy = new LinkedHashMap<Long, ScheduledDeliveryRunnable>(scheduledRunnables);
- for (ScheduledDeliveryRunnable runnable : copy.values())
+ Iterator<MessageReference> iter = scheduledReferences.iterator();
+
+ while (iter.hasNext())
{
- if (filter == null || filter.match(runnable.getReference().getMessage()))
+ MessageReference ref = iter.next();
+ if (filter.match(ref.getMessage()))
{
- runnable.cancel();
-
- refs.add(runnable.getReference());
+ iter.remove();
+ refs.add(ref);
}
}
- for (MessageReference ref : refs)
- {
- scheduledRunnables.remove(ref.getMessage().getMessageID());
- }
}
return refs;
}
public MessageReference removeReferenceWithID(final long id)
{
- synchronized (scheduledRunnables)
+ synchronized (scheduledReferences)
{
- ScheduledDeliveryRunnable runnable = scheduledRunnables.remove(id);
- if (runnable == null)
+ Iterator<MessageReference> iter = scheduledReferences.iterator();
+ while (iter.hasNext())
{
- return null;
+ MessageReference ref = iter.next();
+ if (ref.getMessage().getMessageID() == id)
+ {
+ iter.remove();
+ return ref;
+ }
}
- else
- {
- return runnable.getReference();
- }
}
+
+ return null;
}
private void scheduleDelivery(final ScheduledDeliveryRunnable runnable, final long deliveryTime)
@@ -137,81 +139,65 @@
long now = System.currentTimeMillis();
long delay = deliveryTime - now;
+
+ if (delay < 0)
+ {
+ delay = 0;
+ }
- Future<?> future = scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
-
- runnable.setFuture(future);
+ scheduledExecutor.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
private class ScheduledDeliveryRunnable implements Runnable
{
- private final MessageReference ref;
+ private final long scheduledTime;
- private volatile Future<?> future;
-
- private boolean cancelled;
-
- public ScheduledDeliveryRunnable(final MessageReference ref)
+ public ScheduledDeliveryRunnable(final long scheduledTime)
{
- this.ref = ref;
+ this.scheduledTime = scheduledTime;
}
- public synchronized void setFuture(final Future<?> future)
- {
- if (cancelled)
- {
- future.cancel(false);
- }
- else
- {
- this.future = future;
- }
- }
-
- public synchronized void cancel()
- {
- if (future != null)
- {
- future.cancel(false);
- }
-
- cancelled = true;
- }
-
- public MessageReference getReference()
- {
- return ref;
- }
-
public void run()
{
- if (ScheduledDeliveryHandlerImpl.trace)
- {
- ScheduledDeliveryHandlerImpl.log.trace("Scheduled delivery timeout " + ref);
- }
+ HashSet<Queue> queues = new HashSet<Queue>();
+ LinkedList<MessageReference> references = new LinkedList<MessageReference>();
- synchronized (scheduledRunnables)
+ synchronized (scheduledReferences)
{
- Object removed = scheduledRunnables.remove(ref.getMessage().getMessageID());
-
- if (removed == null)
+
+ Iterator<MessageReference> iter = scheduledReferences.iterator();
+ while (iter.hasNext())
{
- ScheduledDeliveryHandlerImpl.log.warn("Failed to remove timeout " + this);
+ MessageReference reference = iter.next();
+ if (reference.getScheduledDeliveryTime() == this.scheduledTime)
+ {
+ iter.remove();
- return;
+ reference.setScheduledDeliveryTime(0);
+
+ references.add(reference);
+
+ queues.add(reference.getQueue());
+ }
}
}
-
- ref.setScheduledDeliveryTime(0);
-
- synchronized (ref.getQueue())
+
+ for (MessageReference reference : references)
{
- ref.getQueue().resetAllIterators();
-
- ref.getQueue().addHead(ref);
-
- ref.getQueue().deliverAsync();
+ reference.getQueue().addHead(reference);
}
+
+ // Just to speed up GC
+ references.clear();
+
+ for (Queue queue : queues)
+ {
+ synchronized (queue)
+ {
+ queue.resetAllIterators();
+ queue.deliverAsync();
+ }
+ }
}
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2011-02-15 01:04:21 UTC (rev 10205)
@@ -15,9 +15,15 @@
import junit.framework.Assert;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -31,10 +37,9 @@
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(OrderTest.class);
-
// Attributes ----------------------------------------------------
private HornetQServer server;
@@ -78,7 +83,6 @@
server = createServer(persistent, true);
server.start();
-
locator.setBlockOnNonDurableSend(false);
locator.setBlockOnDurableSend(false);
locator.setBlockOnAcknowledge(true);
@@ -126,7 +130,7 @@
if (!started || started && i % 2 == 0)
{
ClientMessage msg = cons.receive(10000);
-
+
Assert.assertEquals(i, msg.getIntProperty("id").intValue());
}
}
@@ -140,7 +144,7 @@
if (!started || started && i % 2 == 0)
{
ClientMessage msg = cons.receive(10000);
-
+
Assert.assertEquals(i, msg.getIntProperty("id").intValue());
}
}
@@ -170,9 +174,9 @@
public void doTestOverCancel(final boolean persistent) throws Exception
{
server = createServer(persistent, true);
+
server.start();
-
locator.setBlockOnNonDurableSend(false);
locator.setBlockOnDurableSend(false);
locator.setBlockOnAcknowledge(false);
@@ -237,6 +241,94 @@
}
+ public void testOrderOverSessionClosePersistentWithRedeliveryDelay() throws Exception
+ {
+ doTestOverCancelWithRedelivery(true);
+ }
+
+ public void testOrderOverSessionCloseNonPersistentWithRedeliveryDelay() throws Exception
+ {
+ doTestOverCancelWithRedelivery(false);
+ }
+
+
+ public void doTestOverCancelWithRedelivery(final boolean persistent) throws Exception
+ {
+ server = createServer(persistent, true);
+
+ server.getAddressSettingsRepository().clear();
+ AddressSettings setting = new AddressSettings();
+ setting.setRedeliveryDelay(1000);
+ server.getAddressSettingsRepository().addMatch("#", setting);
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
+ locator.setBlockOnAcknowledge(false);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(true, true, 0);
+
+ int numberOfMessages = 500;
+
+ try
+ {
+ session.createQueue("queue", "queue", true);
+
+ ClientProducer prod = session.createProducer("queue");
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(i % 2 == 0);
+ msg.putIntProperty("id", i);
+ prod.send(msg);
+ }
+
+ session.close();
+
+ session = sf.createSession(false, false);;
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("queue");
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ msg.acknowledge();
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+ session.close();
+
+
+ session = sf.createSession(false, false);;
+
+ session.start();
+
+ cons = session.createConsumer("queue");
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+
+ // receive again
+ session.commit();
+ session.close();
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-14 13:32:17 UTC (rev 10204)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-15 01:04:21 UTC (rev 10205)
@@ -211,7 +211,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference)
*/
- public boolean checkDLQ(final MessageReference ref) throws Exception
+ public boolean checkRedelivery(final MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
return false;
More information about the hornetq-commits
mailing list