Author: timfox
Date: 2010-01-09 13:26:20 -0500 (Sat, 09 Jan 2010)
New Revision: 8783
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-263
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-08 16:38:13 UTC
(rev 8782)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-09 18:26:20 UTC
(rev 8783)
@@ -88,8 +88,6 @@
private final List<MessageHandler> handlers = new
ArrayList<MessageHandler>();
- private final ConcurrentSet<MessageReference> expiringMessageReferences = new
ConcurrentHashSet<MessageReference>();
-
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
private boolean direct;
@@ -432,7 +430,7 @@
}
}
- public synchronized MessageReference removeReferenceWithID(final long id) throws
Exception
+ public MessageReference removeReferenceWithID(final long id) throws Exception
{
Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -448,8 +446,6 @@
removed = ref;
- removeExpiringReference(removed);
-
break;
}
}
@@ -481,7 +477,7 @@
return ref;
}
- public synchronized MessageReference getReference(final long id)
+ public MessageReference getReference(final long id)
{
Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -633,44 +629,40 @@
{
int count = 0;
- synchronized (this)
- {
+ Transaction tx = new TransactionImpl(storageManager);
- Transaction tx = new TransactionImpl(storageManager);
+ Iterator<MessageReference> iter = messageReferences.iterator();
- Iterator<MessageReference> iter = messageReferences.iterator();
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
- while (iter.hasNext())
+ if (filter == null || filter.match(ref.getMessage()))
{
- MessageReference ref = iter.next();
-
- if (filter == null || filter.match(ref.getMessage()))
- {
- deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- count++;
- }
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ count++;
}
+ }
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
- for (MessageReference messageReference : cancelled)
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference messageReference : cancelled)
+ {
+ if (filter == null || filter.match(messageReference.getMessage()))
{
- if (filter == null || filter.match(messageReference.getMessage()))
- {
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
- }
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, messageReference);
+ count++;
}
-
- tx.commit();
}
+ tx.commit();
+
return count;
}
- public synchronized boolean deleteReference(final long messageID) throws Exception
+ public boolean deleteReference(final long messageID) throws Exception
{
boolean deleted = false;
@@ -696,7 +688,7 @@
return deleted;
}
- public synchronized boolean expireReference(final long messageID) throws Exception
+ public boolean expireReference(final long messageID) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -714,7 +706,7 @@
return false;
}
- public synchronized int expireReferences(final Filter filter) throws Exception
+ public int expireReferences(final Filter filter) throws Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -738,18 +730,23 @@
return count;
}
- public synchronized void expireReferences() throws Exception
+ public void expireReferences() throws Exception
{
- for (MessageReference expiringMessageReference : expiringMessageReferences)
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
{
- if (expiringMessageReference.getMessage().isExpired())
+ MessageReference ref = iter.next();
+ if (ref.getMessage().isExpired())
{
- expireReference(expiringMessageReference.getMessage().getMessageID());
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
}
}
}
- public synchronized boolean sendMessageToDeadLetterAddress(final long messageID)
throws Exception
+ public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -767,7 +764,7 @@
return false;
}
- public synchronized boolean moveReference(final long messageID, final SimpleString
toAddress) throws Exception
+ public boolean moveReference(final long messageID, final SimpleString toAddress)
throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -785,7 +782,7 @@
return false;
}
- public synchronized int moveReferences(final Filter filter, final SimpleString
toAddress) throws Exception
+ public int moveReferences(final Filter filter, final SimpleString toAddress) throws
Exception
{
Transaction tx = new TransactionImpl(storageManager);
@@ -821,7 +818,7 @@
return count;
}
- public synchronized boolean changeReferencePriority(final long messageID, final byte
newPriority) throws Exception
+ public boolean changeReferencePriority(final long messageID, final byte newPriority)
throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -1165,13 +1162,6 @@
busyCount++;
handler.reset();
-
- // if (groupID != null )
- // {
- // // group id being set seems to make delivery stop
- // // FIXME !!! why??
- // break;
- // }
}
else if (status == HandleStatus.NO_MATCH)
{
@@ -1214,6 +1204,11 @@
return false;
}
+ if (checkExpired(reference))
+ {
+ return true;
+ }
+
int startPos = pos;
int busyCount = 0;
boolean setPromptDelivery = false;
@@ -1223,49 +1218,46 @@
Consumer consumer = handler.getConsumer();
- if (!checkExpired(reference))
+ SimpleString groupID =
reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+
+ boolean tryHandle = true;
+
+ if (groupID != null)
{
- SimpleString groupID =
reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
- boolean tryHandle = true;
+ if (groupConsumer != null && groupConsumer != consumer)
+ {
+ tryHandle = false;
+ }
+ }
- if (groupID != null)
+ if (tryHandle)
+ {
+ HandleStatus status = handle(reference, consumer);
+
+ if (status == HandleStatus.HANDLED)
{
- Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+ return true;
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ busyCount++;
- if (groupConsumer != null && groupConsumer != consumer)
+ if (groupID != null)
{
- tryHandle = false;
+ return false;
}
}
-
- if (tryHandle)
+ else if (status == HandleStatus.NO_MATCH)
{
- HandleStatus status = handle(reference, consumer);
-
- if (status == HandleStatus.HANDLED)
+ // if consumer filter reject the message make sure it won't be
assigned the message group
+ if (groupID != null)
{
- return true;
+ groups.remove(groupID);
}
- else if (status == HandleStatus.BUSY)
- {
- busyCount++;
- if (groupID != null)
- {
- return false;
- }
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- // if consumer filter reject the message make sure it won't be
assigned the message group
- if (groupID != null)
- {
- groups.remove(groupID);
- }
-
- setPromptDelivery = true;
- }
+ setPromptDelivery = true;
}
}
@@ -1320,11 +1312,6 @@
if (add)
{
- if (ref.getMessage().getExpiration() != 0)
- {
- expiringMessageReferences.addIfAbsent(ref);
- }
-
if (first)
{
messageReferences.addFirst(ref, ref.getMessage().getPriority());
@@ -1377,14 +1364,6 @@
return status;
}
- private void removeExpiringReference(final MessageReference ref)
- {
- if (ref.getMessage().getExpiration() > 0)
- {
- expiringMessageReferences.remove(ref);
- }
- }
-
private void postAcknowledge(final MessageReference ref) throws Exception
{
final ServerMessage message = ref.getMessage();
@@ -1420,8 +1399,6 @@
}
}
- queue.removeExpiringReference(ref);
-
queue.deliveringCount.decrementAndGet();
message.decrementRefCount(ref);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-01-08
16:38:13 UTC (rev 8782)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-01-09
18:26:20 UTC (rev 8783)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
@@ -821,7 +822,106 @@
}
}
+
+ public void testDropMessagesExpiring() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ HashMap<String, AddressSettings> settings = new HashMap<String,
AddressSettings>();
+
+ AddressSettings set = new AddressSettings();
+ set.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
+
+ settings.put(PagingTest.ADDRESS.toString(), set);
+
+ HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+
+ server.start();
+
+ final int numberOfMessages = 30000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setAckBatchSize(0);
+
+ ClientSession session = sf.createSession();
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ class MyHandler implements MessageHandler
+ {
+ int count;
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (Exception e)
+ {
+
+ }
+
+ count++;
+
+ if (count % 1000 == 0)
+ {
+ log.info("received " + count);
+ }
+
+ try
+ {
+ message.acknowledge();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ session.start();
+
+ consumer.setMessageHandler(new MyHandler());
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ byte[] body = new byte[1024];
+
+ message = session.createMessage(false);
+ message.getBodyBuffer().writeBytes(body);
+
+ message.setExpiration(System.currentTimeMillis() + 100);
+
+ producer.send(message);
+ }
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internalTestPageMultipleDestinations(final boolean transacted) throws
Exception
{
Configuration config = createDefaultConfig();