Author: clebert.suconic(a)jboss.com
Date: 2011-02-15 17:30:27 -0500 (Tue, 15 Feb 2011)
New Revision: 10209
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
https://issues.jboss.org/browse/JBPAPP-5926 - further fixing scheduled delivery order of
delivery
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -173,7 +173,7 @@
// This file is linked to another message, deleting the reference where it
belongs on this case
linkMessage.decrementDelayDeletionCount();
}
- else
+ else if (delayDeletionCount.get() <= 0)
{
if (LargeServerMessageImpl.isTrace)
{
@@ -192,18 +192,27 @@
}
}
+ public int incrementRefCount() throws Exception
+ {
+ int value = super.incrementRefCount();
+
+// new Exception("increment, value=" + value +
+// " on msgCount = " +
+// this.getIntProperty("counter-message") +
+// " messageID=" +
+// this.getMessageID()).printStackTrace();
+
+ return value;
+ }
+
+ static int deleted = 0;
+
@Override
public synchronized int decrementRefCount() throws Exception
{
int currentRefCount = super.decrementRefCount();
- // We use <= as this could be used by load.
- // because of a failure, no references were loaded, so we have 0... and we still
need to delete the associated
- // files
- if (delayDeletionCount.get() <= 0)
- {
- checkDelete();
- }
+ checkDelete();
return currentRefCount;
}
@@ -315,7 +324,7 @@
file = storageManager.createFileForLargeMessage(getMessageID(), durable);
file.open();
-
+
bodySize = file.size();
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -466,6 +466,9 @@
public synchronized Binding removeBinding(final SimpleString uniqueName) throws
Exception
{
+
+ addressSettingsRepository.clear();
+
Binding binding = addressManager.removeBinding(uniqueName);
if (binding == null)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-15
21:09:12 UTC (rev 10208)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -68,7 +68,7 @@
void cancel(Transaction tx, MessageReference ref) throws Exception;
- void cancel(MessageReference reference) throws Exception;
+ void cancel(MessageReference reference, long timeBase) throws Exception;
void deliverAsync();
@@ -129,7 +129,7 @@
Collection<Consumer> getConsumers();
- boolean checkRedelivery(MessageReference ref) throws Exception;
+ boolean checkRedelivery(MessageReference ref, long timeBase) throws Exception;
LinkedListIterator<MessageReference> iterator();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/ScheduledDeliveryHandler.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -21,7 +21,7 @@
*/
public interface ScheduledDeliveryHandler
{
- boolean checkAndSchedule(MessageReference ref);
+ boolean checkAndSchedule(MessageReference ref, final boolean tail);
int getScheduledCount();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -189,12 +189,14 @@
}
Queue queue = null;
+
+ long timeBase = System.currentTimeMillis();
for (MessageReference ref2 : list)
{
queue = ref2.getQueue();
- queue.cancel(ref2);
+ queue.cancel(ref2, timeBase);
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -1011,6 +1011,8 @@
public void destroyQueue(final SimpleString queueName, final ServerSession session)
throws Exception
{
+ addressSettingsRepository.clear();
+
Binding binding = postOffice.getBinding(queueName);
if (binding == null)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -317,7 +317,7 @@
/* Called when a message is cancelled back into the queue */
public synchronized void addHead(final MessageReference ref)
{
- if (scheduledDeliveryHandler.checkAndSchedule(ref))
+ if (scheduledDeliveryHandler.checkAndSchedule(ref, false))
{
return;
}
@@ -330,7 +330,7 @@
public synchronized void reload(final MessageReference ref)
{
queueMemorySize.addAndGet(ref.getMessage().getMemoryEstimate());
- if (!scheduledDeliveryHandler.checkAndSchedule(ref))
+ if (!scheduledDeliveryHandler.checkAndSchedule(ref, true))
{
internalAddTail(ref);
}
@@ -347,7 +347,7 @@
public void addTail(final MessageReference ref, final boolean direct)
{
- if (scheduledDeliveryHandler.checkAndSchedule(ref))
+ if (scheduledDeliveryHandler.checkAndSchedule(ref, true))
{
synchronized (this)
{
@@ -806,11 +806,11 @@
getRefsOperation(tx).addAck(reference);
}
- public synchronized void cancel(final MessageReference reference) throws Exception
+ public synchronized void cancel(final MessageReference reference, final long timeBase)
throws Exception
{
- if (checkRedelivery(reference))
+ if (checkRedelivery(reference, timeBase))
{
- if (!scheduledDeliveryHandler.checkAndSchedule(reference))
+ if (!scheduledDeliveryHandler.checkAndSchedule(reference, false))
{
internalAddHead(reference);
}
@@ -1275,10 +1275,6 @@
{
for (ConsumerHolder holder : this.consumerList)
{
- if (holder.iter != null)
- {
- holder.iter.close();
- }
holder.iter = null;
}
}
@@ -1575,7 +1571,7 @@
}
}
- public boolean checkRedelivery(final MessageReference reference) throws Exception
+ public boolean checkRedelivery(final MessageReference reference, final long timeBase)
throws Exception
{
ServerMessage message = reference.getMessage();
@@ -1604,7 +1600,7 @@
if (redeliveryDelay > 0)
{
- reference.setScheduledDeliveryTime(System.currentTimeMillis() +
redeliveryDelay);
+ reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);
if (message.isDurable() && durable)
{
@@ -1987,12 +1983,14 @@
public void afterRollback(final Transaction tx)
{
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new
HashMap<QueueImpl, LinkedList<MessageReference>>();
+
+ long timeBase = System.currentTimeMillis();
for (MessageReference ref : refsToAck)
{
try
{
- if (ref.getQueue().checkRedelivery(ref))
+ if (ref.getQueue().checkRedelivery(ref, timeBase))
{
LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -43,6 +43,8 @@
private final ScheduledExecutorService scheduledExecutor;
+ private final Object lockDelivery = new Object();
+
private LinkedList<MessageReference> scheduledReferences = new
LinkedList<MessageReference>();
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
@@ -50,7 +52,7 @@
this.scheduledExecutor = scheduledExecutor;
}
- public boolean checkAndSchedule(final MessageReference ref)
+ public boolean checkAndSchedule(final MessageReference ref, final boolean tail)
{
long deliveryTime = ref.getScheduledDeliveryTime();
@@ -65,7 +67,14 @@
synchronized (scheduledReferences)
{
- scheduledReferences.add(ref);
+ if (tail)
+ {
+ scheduledReferences.add(ref);
+ }
+ else
+ {
+ scheduledReferences.addFirst(ref);
+ }
}
scheduleDelivery(runnable, deliveryTime);
@@ -162,41 +171,43 @@
HashSet<Queue> queues = new HashSet<Queue>();
LinkedList<MessageReference> references = new
LinkedList<MessageReference>();
- synchronized (scheduledReferences)
+ synchronized (lockDelivery)
{
-
- Iterator<MessageReference> iter = scheduledReferences.iterator();
- while (iter.hasNext())
+ synchronized (scheduledReferences)
{
- MessageReference reference = iter.next();
- if (reference.getScheduledDeliveryTime() == this.scheduledTime)
+
+ Iterator<MessageReference> iter = scheduledReferences.iterator();
+ while (iter.hasNext())
{
- iter.remove();
-
- reference.setScheduledDeliveryTime(0);
-
- references.add(reference);
-
- queues.add(reference.getQueue());
+ MessageReference reference = iter.next();
+ if (reference.getScheduledDeliveryTime() <= this.scheduledTime)
+ {
+ iter.remove();
+
+ reference.setScheduledDeliveryTime(0);
+
+ references.add(reference);
+
+ queues.add(reference.getQueue());
+ }
}
}
- }
-
- for (MessageReference reference : references)
- {
- reference.getQueue().addHead(reference);
- }
-
- // Just to speed up GC
- references.clear();
-
- for (Queue queue : queues)
- {
- synchronized (queue)
+
+ for (MessageReference reference : references)
{
- queue.resetAllIterators();
- queue.deliverAsync();
+ reference.getQueue().addTail(reference);
}
+
+ // Just to speed up GC
+ references.clear();
+
+ for (Queue queue : queues)
+ {
+ synchronized (queue)
+ {
+ queue.deliverAsync();
+ }
+ }
}
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/HierarchicalRepository.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -62,4 +62,6 @@
* clear the repository
*/
void clear();
+
+ int getCacheSize();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/settings/impl/HierarchicalObjectRepository.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -76,6 +76,11 @@
matches.put(match, match1);
onChange();
}
+
+ public int getCacheSize()
+ {
+ return cache.size();
+ }
/**
* return the value held against the nearest match
@@ -153,6 +158,7 @@
public void removeMatch(final String match)
{
matches.remove(match);
+ new Exception("Clearing cache").printStackTrace();
cache.clear();
onChange();
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -258,7 +258,7 @@
server.getAddressSettingsRepository().clear();
AddressSettings setting = new AddressSettings();
- setting.setRedeliveryDelay(1000);
+ setting.setRedeliveryDelay(500);
server.getAddressSettingsRepository().addMatch("#", setting);
server.start();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -92,6 +92,29 @@
session.close();
}
+
+ public void testMemoryLeakOnAddressSettingForTemporaryQueue() throws Exception
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createTemporaryQueue(address, queue);
+
+ session.close();
+ session = sf.createSession();
+ }
+
+
+ session.close();
+
+ sf.close();
+
+ System.out.println("size = " +
server.getAddressSettingsRepository().getCacheSize());
+
+ assertTrue(server.getAddressSettingsRepository().getCacheSize() < 10);
+ }
+
public void testPaginStoreIsRemovedWhenQueueIsDeleted() throws Exception
{
SimpleString queue = RandomUtil.randomSimpleString();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -166,7 +166,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
*/
- public void cancel(final MessageReference reference) throws Exception
+ public void cancel(final MessageReference reference, final long timeBase) throws
Exception
{
// TODO Auto-generated method stub
@@ -211,7 +211,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference)
*/
- public boolean checkRedelivery(final MessageReference ref) throws Exception
+ public boolean checkRedelivery(final MessageReference ref, final long timeBase) throws
Exception
{
// TODO Auto-generated method stub
return false;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-02-15
21:09:12 UTC (rev 10208)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-02-15
22:30:27 UTC (rev 10209)
@@ -494,16 +494,19 @@
File largeMessagesFileDir = new File(getLargeMessagesDir());
// Deleting the file is async... we keep looking for a period of the time until the
file is really gone
- for (int i = 0; i < 100; i++)
+ long timeout = System.currentTimeMillis() + 5000;
+ while (timeout > System.currentTimeMillis() &&
largeMessagesFileDir.listFiles().length != expect)
{
- if (largeMessagesFileDir.listFiles().length != expect)
+ Thread.sleep(100);
+ }
+
+
+ if (expect != largeMessagesFileDir.listFiles().length)
+ {
+ for (File file : largeMessagesFileDir.listFiles())
{
- Thread.sleep(10);
+ System.out.println("File " + file + " still on ");
}
- else
- {
- break;
- }
}
Assert.assertEquals(expect, largeMessagesFileDir.listFiles().length);