[hornetq-commits] JBoss hornetq SVN: r8271 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 12 08:31:33 EST 2009


Author: timfox
Date: 2009-11-12 08:31:32 -0500 (Thu, 12 Nov 2009)
New Revision: 8271

Modified:
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
fixed message redistribution bug + some minor reformatting to project code standards

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-12 13:00:07 UTC (rev 8270)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-12 13:31:32 UTC (rev 8271)
@@ -348,7 +348,7 @@
                      long redistributionDelay = addressSettings.getRedistributionDelay();
 
                      if (redistributionDelay != -1)
-                     {
+                     {                        
                         queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
                      }
                   }
@@ -418,7 +418,7 @@
                      long redistributionDelay = addressSettings.getRedistributionDelay();
 
                      if (redistributionDelay != -1)
-                     {
+                     {                       
                         queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
                      }
                   }
@@ -467,8 +467,7 @@
       }
 
       String uid = UUIDGenerator.getInstance().generateStringUUID();
-     // log.info("sending binding" + binding +" added " + binding.getClusterName() + " binding.getDistance() = " + binding.getDistance() + " " + server.getConfiguration().isBackup());
-      //Thread.dumpStack();
+    
       managementService.sendNotification(new Notification(uid, NotificationType.BINDING_ADDED, props));
    }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-12 13:00:07 UTC (rev 8270)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-12 13:31:32 UTC (rev 8271)
@@ -277,7 +277,9 @@
       cancelRedistributor();
 
       distributionPolicy.addConsumer(consumer);
+
       consumers.add(consumer);
+
       if (consumer.getFilter() != null)
       {
          messageHandlers.put(consumer, new FilterMessageHandler(messageReferences.iterator()));
@@ -350,6 +352,8 @@
          redistributor.stop();
 
          redistributor = null;
+
+         distributionPolicy.removeConsumer(redistributor);
       }
 
       if (future != null)
@@ -1048,7 +1052,8 @@
          consumer = distributionPolicy.getNextConsumer();
 
          MessageHandler handler = messageHandlers.get(consumer);
-         if(handler == null)
+
+         if (handler == null)
          {
             handler = globalHandler;
          }
@@ -1100,6 +1105,7 @@
          if (groupID != null)
          {
             Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
             if (groupConsumer != null && groupConsumer != consumer)
             {
                continue;
@@ -1510,7 +1516,7 @@
       return paused;
    }
 
-   interface MessageHandler
+   private static interface MessageHandler
    {
       MessageReference peek(Consumer consumer);
 
@@ -1519,18 +1525,19 @@
       void reset();
    }
 
-   class FilterMessageHandler implements MessageHandler
+   private class FilterMessageHandler implements MessageHandler
    {
       private Iterator<MessageReference> iterator;
 
-      public FilterMessageHandler(Iterator<MessageReference> iterator)
+      public FilterMessageHandler(final Iterator<MessageReference> iterator)
       {
          this.iterator = iterator;
       }
 
-      public MessageReference peek(Consumer consumer)
+      public MessageReference peek(final Consumer consumer)
       {
          MessageReference reference;
+         
          if (iterator.hasNext())
          {
             reference = iterator.next();
@@ -1561,9 +1568,9 @@
       }
    }
 
-   class NullFilterMessageHandler implements MessageHandler
+   private class NullFilterMessageHandler implements MessageHandler
    {
-      public MessageReference peek(Consumer consumer)
+      public MessageReference peek(final Consumer consumer)
       {
          return messageReferences.peekFirst();
       }
@@ -1575,7 +1582,7 @@
 
       public void reset()
       {
-         //no-op
+         // no-op
       }
    }
 }



More information about the hornetq-commits mailing list