[jboss-cvs] JBoss Messaging SVN: r6606 - in trunk: src/main/org/jboss/messaging/core/postoffice/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Apr 28 15:04:26 EDT 2009
Author: timfox
Date: 2009-04-28 15:04:26 -0400 (Tue, 28 Apr 2009)
New Revision: 6606
Modified:
trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1551
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2009-04-28 19:04:26 UTC (rev 6606)
@@ -24,10 +24,9 @@
import java.util.Collection;
-import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.utils.SimpleString;
/**
* A Bindings
@@ -50,5 +49,5 @@
void setRouteWhenNoConsumers(boolean takePriorityIntoAccount);
- boolean redistribute(ServerMessage message, SimpleString routingName, Transaction tx) throws Exception;
+ boolean redistribute(ServerMessage message, Queue originatingQueue, Transaction tx) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-04-28 19:04:26 UTC (rev 6606)
@@ -65,7 +65,7 @@
void route(ServerMessage message, Transaction tx) throws Exception;
- boolean redistribute(ServerMessage message, SimpleString routingName, Transaction tx) throws Exception;
+ boolean redistribute(ServerMessage message, final Queue originatingQueue, Transaction tx) throws Exception;
List<Queue> activate();
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-04-28 19:04:26 UTC (rev 6606)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.server.Bindable;
+import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.utils.SimpleString;
@@ -169,12 +170,14 @@
}
}
- public boolean redistribute(final ServerMessage message, final SimpleString routingName, final Transaction tx) throws Exception
+ public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
{
if (routeWhenNoConsumers)
{
return false;
}
+
+ SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -226,7 +229,7 @@
boolean highPrior = binding.isHighAcceptPriority(message);
- if (highPrior && (filter == null || filter.match(message)))
+ if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message)))
{
theBinding = binding;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-28 19:04:26 UTC (rev 6606)
@@ -669,13 +669,13 @@
route(message, null);
}
- public boolean redistribute(final ServerMessage message, final SimpleString routingName, final Transaction tx) throws Exception
+ public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
{
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
if (bindings != null)
{
- return bindings.redistribute(message, routingName, tx);
+ return bindings.redistribute(message, originatingQueue, tx);
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java 2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java 2009-04-28 19:04:26 UTC (rev 6606)
@@ -139,7 +139,7 @@
final Transaction tx = new TransactionImpl(storageManager);
- boolean routed = postOffice.redistribute(reference.getMessage(), queue.getName(), tx);
+ boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
if (routed)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-28 19:04:26 UTC (rev 6606)
@@ -424,11 +424,14 @@
if (delay > 0)
{
- DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
-
- future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
-
- futures.add(future);
+ if (consumers.size() == 0 && messageReferences.size() > 0)
+ {
+ DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
+
+ future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
+
+ futures.add(future);
+ }
}
else
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-04-28 18:28:45 UTC (rev 6605)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-04-28 19:04:26 UTC (rev 6606)
@@ -356,89 +356,95 @@
public void testBackAndForth() throws Exception
{
- setupCluster(false);
-
- startServers(0, 1, 2);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
+ for (int i = 0; i < 10; i++)
+ {
+ setupCluster(false);
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ final String ADDRESS = "queues.testaddress";
+ final String QUEUE = "queue0";
+
+
+ createQueue(0, ADDRESS, QUEUE, null, false);
+ createQueue(1, ADDRESS, QUEUE, null, false);
+ createQueue(2, ADDRESS, QUEUE, null, false);
+
+ addConsumer(0, 0, QUEUE, null);
+
+ waitForBindings(0, ADDRESS, 1, 1, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 1, false);
+ waitForBindings(2, ADDRESS, 2, 1, false);
+
+ send(0, ADDRESS, 20, false, null);
+
+ waitForMessages(0, ADDRESS, 20);
+
+ removeConsumer(0);
+
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 0, false);
+ waitForBindings(2, ADDRESS, 2, 0, false);
+
+ addConsumer(1, 1, QUEUE, null);
+
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 1, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
+
+ waitForMessages(1, ADDRESS, 20);
+ waitForMessages(0, ADDRESS, 0);
+
+
+ waitForBindings(0, ADDRESS, 2, 1, false);
+ waitForBindings(1, ADDRESS, 2, 0, false);
+ waitForBindings(2, ADDRESS, 2, 1, false);
+
+ removeConsumer(1);
+
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 0, false);
+ waitForBindings(2, ADDRESS, 2, 0, false);
+
+ addConsumer(0, 0, QUEUE, null);
+
+ waitForBindings(0, ADDRESS, 1, 1, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 1, false);
+ waitForBindings(2, ADDRESS, 2, 1, false);
+
+ waitForMessages(0, ADDRESS, 20);
+
+ verifyReceiveAll(20, 0);
+ verifyNotReceive(0);
+
+ addConsumer(1, 1, QUEUE, null);
+ verifyNotReceive(1);
+ removeConsumer(1);
+
+ tearDown();
+ setUp();
+ }
- final String ADDRESS = "queues.testaddress";
- final String QUEUE = "queue0";
-
-
- createQueue(0, ADDRESS, QUEUE, null, false);
- createQueue(1, ADDRESS, QUEUE, null, false);
- createQueue(2, ADDRESS, QUEUE, null, false);
-
- addConsumer(0, 0, QUEUE, null);
-
- waitForBindings(0, ADDRESS, 1, 1, true);
- waitForBindings(1, ADDRESS, 1, 0, true);
- waitForBindings(2, ADDRESS, 1, 0, true);
-
- waitForBindings(0, ADDRESS, 2, 0, false);
- waitForBindings(1, ADDRESS, 2, 1, false);
- waitForBindings(2, ADDRESS, 2, 1, false);
-
- send(0, ADDRESS, 20, false, null);
-
- waitForMessages(0, ADDRESS, 20);
-
- removeConsumer(0);
-
- waitForBindings(0, ADDRESS, 1, 0, true);
- waitForBindings(1, ADDRESS, 1, 0, true);
- waitForBindings(2, ADDRESS, 1, 0, true);
-
- waitForBindings(0, ADDRESS, 2, 0, false);
- waitForBindings(1, ADDRESS, 2, 0, false);
- waitForBindings(2, ADDRESS, 2, 0, false);
-
- addConsumer(1, 1, QUEUE, null);
-
- waitForBindings(0, ADDRESS, 1, 0, true);
- waitForBindings(1, ADDRESS, 1, 1, true);
- waitForBindings(2, ADDRESS, 1, 0, true);
-
- waitForMessages(1, ADDRESS, 20);
- waitForMessages(0, ADDRESS, 0);
-
-
- waitForBindings(0, ADDRESS, 2, 1, false);
- waitForBindings(1, ADDRESS, 2, 0, false);
- waitForBindings(2, ADDRESS, 2, 1, false);
-
- removeConsumer(1);
-
- waitForBindings(0, ADDRESS, 1, 0, true);
- waitForBindings(1, ADDRESS, 1, 0, true);
- waitForBindings(2, ADDRESS, 1, 0, true);
-
- waitForBindings(0, ADDRESS, 2, 0, false);
- waitForBindings(1, ADDRESS, 2, 0, false);
- waitForBindings(2, ADDRESS, 2, 0, false);
-
- addConsumer(0, 0, QUEUE, null);
-
- waitForBindings(0, ADDRESS, 1, 1, true);
- waitForBindings(1, ADDRESS, 1, 0, true);
- waitForBindings(2, ADDRESS, 1, 0, true);
-
- waitForBindings(0, ADDRESS, 2, 0, false);
- waitForBindings(1, ADDRESS, 2, 1, false);
- waitForBindings(2, ADDRESS, 2, 1, false);
-
- waitForMessages(0, ADDRESS, 20);
-
- verifyReceiveAll(20, 0);
- verifyNotReceive(0);
-
- addConsumer(1, 1, QUEUE, null);
- verifyNotReceive(1);
- removeConsumer(1);
-
}
public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
More information about the jboss-cvs-commits
mailing list