Author: timfox
Date: 2009-11-12 08:31:32 -0500 (Thu, 12 Nov 2009)
New Revision: 8271
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
fixed message redistribution bug + some minor reformatting to project code standards
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-12
13:00:07 UTC (rev 8270)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-12
13:31:32 UTC (rev 8271)
@@ -348,7 +348,7 @@
long redistributionDelay =
addressSettings.getRedistributionDelay();
if (redistributionDelay != -1)
- {
+ {
queue.addRedistributor(redistributionDelay,
redistributorExecutorFactory.getExecutor());
}
}
@@ -418,7 +418,7 @@
long redistributionDelay =
addressSettings.getRedistributionDelay();
if (redistributionDelay != -1)
- {
+ {
queue.addRedistributor(redistributionDelay,
redistributorExecutorFactory.getExecutor());
}
}
@@ -467,8 +467,7 @@
}
String uid = UUIDGenerator.getInstance().generateStringUUID();
- // log.info("sending binding" + binding +" added " +
binding.getClusterName() + " binding.getDistance() = " + binding.getDistance() +
" " + server.getConfiguration().isBackup());
- //Thread.dumpStack();
+
managementService.sendNotification(new Notification(uid,
NotificationType.BINDING_ADDED, props));
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 13:00:07 UTC
(rev 8270)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-12 13:31:32 UTC
(rev 8271)
@@ -277,7 +277,9 @@
cancelRedistributor();
distributionPolicy.addConsumer(consumer);
+
consumers.add(consumer);
+
if (consumer.getFilter() != null)
{
messageHandlers.put(consumer, new
FilterMessageHandler(messageReferences.iterator()));
@@ -350,6 +352,8 @@
redistributor.stop();
redistributor = null;
+
+ distributionPolicy.removeConsumer(redistributor);
}
if (future != null)
@@ -1048,7 +1052,8 @@
consumer = distributionPolicy.getNextConsumer();
MessageHandler handler = messageHandlers.get(consumer);
- if(handler == null)
+
+ if (handler == null)
{
handler = globalHandler;
}
@@ -1100,6 +1105,7 @@
if (groupID != null)
{
Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
if (groupConsumer != null && groupConsumer != consumer)
{
continue;
@@ -1510,7 +1516,7 @@
return paused;
}
- interface MessageHandler
+ private static interface MessageHandler
{
MessageReference peek(Consumer consumer);
@@ -1519,18 +1525,19 @@
void reset();
}
- class FilterMessageHandler implements MessageHandler
+ private class FilterMessageHandler implements MessageHandler
{
private Iterator<MessageReference> iterator;
- public FilterMessageHandler(Iterator<MessageReference> iterator)
+ public FilterMessageHandler(final Iterator<MessageReference> iterator)
{
this.iterator = iterator;
}
- public MessageReference peek(Consumer consumer)
+ public MessageReference peek(final Consumer consumer)
{
MessageReference reference;
+
if (iterator.hasNext())
{
reference = iterator.next();
@@ -1561,9 +1568,9 @@
}
}
- class NullFilterMessageHandler implements MessageHandler
+ private class NullFilterMessageHandler implements MessageHandler
{
- public MessageReference peek(Consumer consumer)
+ public MessageReference peek(final Consumer consumer)
{
return messageReferences.peekFirst();
}
@@ -1575,7 +1582,7 @@
public void reset()
{
- //no-op
+ // no-op
}
}
}