[jboss-cvs] JBoss Messaging SVN: r6606 - in trunk: src/main/org/jboss/messaging/core/postoffice/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Apr 28 15:04:26 EDT 2009


Author: timfox
Date: 2009-04-28 15:04:26 -0400 (Tue, 28 Apr 2009)
New Revision: 6606

Modified:
   trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
   trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1551

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2009-04-28 19:04:26 UTC (rev 6606)
@@ -24,10 +24,9 @@
 
 import java.util.Collection;
 
-import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.utils.SimpleString;
 
 /**
  * A Bindings
@@ -50,5 +49,5 @@
 
    void setRouteWhenNoConsumers(boolean takePriorityIntoAccount);
 
-   boolean redistribute(ServerMessage message, SimpleString routingName, Transaction tx) throws Exception;
+   boolean redistribute(ServerMessage message, Queue originatingQueue, Transaction tx) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2009-04-28 19:04:26 UTC (rev 6606)
@@ -65,7 +65,7 @@
    
    void route(ServerMessage message, Transaction tx) throws Exception;
    
-   boolean redistribute(ServerMessage message, SimpleString routingName, Transaction tx) throws Exception;
+   boolean redistribute(ServerMessage message, final Queue originatingQueue, Transaction tx) throws Exception;
 
    List<Queue> activate();
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-04-28 19:04:26 UTC (rev 6606)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.utils.SimpleString;
@@ -169,12 +170,14 @@
       }
    }
 
-   public boolean redistribute(final ServerMessage message, final SimpleString routingName, final Transaction tx) throws Exception
+   public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
    {
       if (routeWhenNoConsumers)
       {
          return false;
       }
+      
+      SimpleString routingName = originatingQueue.getName();
 
       List<Binding> bindings = routingNameBindingMap.get(routingName);
 
@@ -226,7 +229,7 @@
 
          boolean highPrior = binding.isHighAcceptPriority(message);
 
-         if (highPrior && (filter == null || filter.match(message)))
+         if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message)))
          {
             theBinding = binding;
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-04-28 19:04:26 UTC (rev 6606)
@@ -669,13 +669,13 @@
       route(message, null);
    }
 
-   public boolean redistribute(final ServerMessage message, final SimpleString routingName, final Transaction tx) throws Exception
+   public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
    {
       Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
 
       if (bindings != null)
       {
-         return bindings.redistribute(message, routingName, tx);
+         return bindings.redistribute(message, originatingQueue, tx);
       }
       else
       {

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java	2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java	2009-04-28 19:04:26 UTC (rev 6606)
@@ -139,7 +139,7 @@
       
       final Transaction tx = new TransactionImpl(storageManager);
 
-      boolean routed = postOffice.redistribute(reference.getMessage(), queue.getName(), tx);
+      boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
 
       if (routed)
       {    

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-28 19:04:26 UTC (rev 6606)
@@ -424,11 +424,14 @@
 
       if (delay > 0)
       {
-         DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
-
-         future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
-
-         futures.add(future);
+         if (consumers.size() == 0 && messageReferences.size() > 0)
+         {         
+            DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
+   
+            future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
+   
+            futures.add(future);
+         }
       }
       else
       {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-04-28 19:04:26 UTC (rev 6606)
@@ -356,89 +356,95 @@
    
    public void testBackAndForth() throws Exception
    {
-      setupCluster(false);
-
-      startServers(0, 1, 2);
-
-      setupSessionFactory(0, isNetty());
-      setupSessionFactory(1, isNetty());
-      setupSessionFactory(2, isNetty());
+      for (int i = 0; i < 10; i++)
+      {
+         setupCluster(false);
+   
+         startServers(0, 1, 2);
+   
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+         setupSessionFactory(2, isNetty());
+         
+         final String ADDRESS = "queues.testaddress";
+         final String QUEUE = "queue0";
+   
+   
+         createQueue(0, ADDRESS, QUEUE, null, false);
+         createQueue(1, ADDRESS, QUEUE, null, false);
+         createQueue(2, ADDRESS, QUEUE, null, false);
+   
+         addConsumer(0, 0, QUEUE, null);
+   
+         waitForBindings(0, ADDRESS, 1, 1, true);
+         waitForBindings(1, ADDRESS, 1, 0, true);
+         waitForBindings(2, ADDRESS, 1, 0, true);
+   
+         waitForBindings(0, ADDRESS, 2, 0, false);
+         waitForBindings(1, ADDRESS, 2, 1, false);
+         waitForBindings(2, ADDRESS, 2, 1, false);
+   
+         send(0, ADDRESS, 20, false, null);
+         
+         waitForMessages(0, ADDRESS, 20);
+   
+         removeConsumer(0);
+         
+         waitForBindings(0, ADDRESS, 1, 0, true);
+         waitForBindings(1, ADDRESS, 1, 0, true);
+         waitForBindings(2, ADDRESS, 1, 0, true);
+   
+         waitForBindings(0, ADDRESS, 2, 0, false);
+         waitForBindings(1, ADDRESS, 2, 0, false);
+         waitForBindings(2, ADDRESS, 2, 0, false);
+   
+         addConsumer(1, 1, QUEUE, null);
+         
+         waitForBindings(0, ADDRESS, 1, 0, true);
+         waitForBindings(1, ADDRESS, 1, 1, true);
+         waitForBindings(2, ADDRESS, 1, 0, true);
+         
+         waitForMessages(1, ADDRESS, 20);
+         waitForMessages(0, ADDRESS, 0);
+         
+   
+         waitForBindings(0, ADDRESS, 2, 1, false);
+         waitForBindings(1, ADDRESS, 2, 0, false);
+         waitForBindings(2, ADDRESS, 2, 1, false);
+         
+         removeConsumer(1);
+         
+         waitForBindings(0, ADDRESS, 1, 0, true);
+         waitForBindings(1, ADDRESS, 1, 0, true);
+         waitForBindings(2, ADDRESS, 1, 0, true);
+   
+         waitForBindings(0, ADDRESS, 2, 0, false);
+         waitForBindings(1, ADDRESS, 2, 0, false);
+         waitForBindings(2, ADDRESS, 2, 0, false);
+   
+         addConsumer(0, 0, QUEUE, null);
+         
+         waitForBindings(0, ADDRESS, 1, 1, true);
+         waitForBindings(1, ADDRESS, 1, 0, true);
+         waitForBindings(2, ADDRESS, 1, 0, true);
+         
+         waitForBindings(0, ADDRESS, 2, 0, false);
+         waitForBindings(1, ADDRESS, 2, 1, false);
+         waitForBindings(2, ADDRESS, 2, 1, false);
+   
+         waitForMessages(0, ADDRESS, 20);
+         
+         verifyReceiveAll(20, 0);
+         verifyNotReceive(0);
+         
+         addConsumer(1, 1, QUEUE, null);
+         verifyNotReceive(1);
+         removeConsumer(1);
+         
+         tearDown();
+         setUp();
+      }
       
-      final String ADDRESS = "queues.testaddress";
-      final String QUEUE = "queue0";
-
-
-      createQueue(0, ADDRESS, QUEUE, null, false);
-      createQueue(1, ADDRESS, QUEUE, null, false);
-      createQueue(2, ADDRESS, QUEUE, null, false);
-
-      addConsumer(0, 0, QUEUE, null);
-
-      waitForBindings(0, ADDRESS, 1, 1, true);
-      waitForBindings(1, ADDRESS, 1, 0, true);
-      waitForBindings(2, ADDRESS, 1, 0, true);
-
-      waitForBindings(0, ADDRESS, 2, 0, false);
-      waitForBindings(1, ADDRESS, 2, 1, false);
-      waitForBindings(2, ADDRESS, 2, 1, false);
-
-      send(0, ADDRESS, 20, false, null);
-      
-      waitForMessages(0, ADDRESS, 20);
-
-      removeConsumer(0);
-      
-      waitForBindings(0, ADDRESS, 1, 0, true);
-      waitForBindings(1, ADDRESS, 1, 0, true);
-      waitForBindings(2, ADDRESS, 1, 0, true);
-
-      waitForBindings(0, ADDRESS, 2, 0, false);
-      waitForBindings(1, ADDRESS, 2, 0, false);
-      waitForBindings(2, ADDRESS, 2, 0, false);
-
-      addConsumer(1, 1, QUEUE, null);
-      
-      waitForBindings(0, ADDRESS, 1, 0, true);
-      waitForBindings(1, ADDRESS, 1, 1, true);
-      waitForBindings(2, ADDRESS, 1, 0, true);
-      
-      waitForMessages(1, ADDRESS, 20);
-      waitForMessages(0, ADDRESS, 0);
-      
-
-      waitForBindings(0, ADDRESS, 2, 1, false);
-      waitForBindings(1, ADDRESS, 2, 0, false);
-      waitForBindings(2, ADDRESS, 2, 1, false);
-      
-      removeConsumer(1);
-      
-      waitForBindings(0, ADDRESS, 1, 0, true);
-      waitForBindings(1, ADDRESS, 1, 0, true);
-      waitForBindings(2, ADDRESS, 1, 0, true);
-
-      waitForBindings(0, ADDRESS, 2, 0, false);
-      waitForBindings(1, ADDRESS, 2, 0, false);
-      waitForBindings(2, ADDRESS, 2, 0, false);
-
-      addConsumer(0, 0, QUEUE, null);
-      
-      waitForBindings(0, ADDRESS, 1, 1, true);
-      waitForBindings(1, ADDRESS, 1, 0, true);
-      waitForBindings(2, ADDRESS, 1, 0, true);
-      
-      waitForBindings(0, ADDRESS, 2, 0, false);
-      waitForBindings(1, ADDRESS, 2, 1, false);
-      waitForBindings(2, ADDRESS, 2, 1, false);
-
-      waitForMessages(0, ADDRESS, 20);
-      
-      verifyReceiveAll(20, 0);
-      verifyNotReceive(0);
-      
-      addConsumer(1, 1, QUEUE, null);
-      verifyNotReceive(1);
-      removeConsumer(1);
-      
    }
    
    public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception




More information about the jboss-cvs-commits mailing list