Author: clebert.suconic(a)jboss.com
Date: 2011-03-18 02:09:59 -0400 (Fri, 18 Mar 2011)
New Revision: 10344
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6130 /
https://issues.jboss.org/browse/HORNETQ-654
- fixing Redistribution issue over restart
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-18
04:54:45 UTC (rev 10343)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-03-18
06:09:59 UTC (rev 10344)
@@ -674,6 +674,12 @@
public boolean redistribute(final ServerMessage message, final Queue originatingQueue,
final Transaction tx) throws Exception
{
+
+ // We have to copy the message and store it separately, otherwise we may lose
remote bindings in case of restart before the message
+ // arrived the target node
+ // as described on
https://issues.jboss.org/browse/JBPAPP-6130
+ ServerMessage copyRedistribute = message.copy(storageManager.generateUniqueID());
+
Bindings bindings =
addressManager.getBindingsForRoutingAddress(message.getAddress());
boolean res = false;
@@ -682,12 +688,18 @@
{
RoutingContext context = new RoutingContextImpl(tx);
- boolean routed = bindings.redistribute(message, originatingQueue, context);
+ boolean routed = bindings.redistribute(copyRedistribute, originatingQueue,
context);
if (routed)
{
- processRoute(message, context, false);
+ if (message.isDurable())
+ {
+ storageManager.storeMessageTransactional(tx.getID(), copyRedistribute);
+ tx.setContainsPersistent();
+ }
+ processRoute(copyRedistribute, context, false);
+
res = true;
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-18
04:54:45 UTC (rev 10343)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-03-18
06:09:59 UTC (rev 10344)
@@ -404,7 +404,7 @@
}
catch (HornetQException e)
{
- log.warn("Unable to send message, will try again once bridge
reconnects");
+ log.warn("Unable to send message, will try again once bridge
reconnects", e);
refs.remove(ref);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-03-18
04:54:45 UTC (rev 10343)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2011-03-18
06:09:59 UTC (rev 10344)
@@ -127,7 +127,7 @@
if (routed)
{
- doRedistribute(reference, tx);
+ ackRedistribution(reference, tx);
return HandleStatus.HANDLED;
}
@@ -137,7 +137,7 @@
}
}
- private void doRedistribute(final MessageReference reference, final Transaction tx)
throws Exception
+ private void ackRedistribution(final MessageReference reference, final Transaction tx)
throws Exception
{
reference.handled();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-18
04:54:45 UTC (rev 10343)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-03-18
06:09:59 UTC (rev 10344)
@@ -1132,9 +1132,13 @@
do
{
message = consumer.consumer.receive(500);
-
if (message != null)
{
+ if (ack)
+ {
+ message.acknowledge();
+ }
+
int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
ints.add(count);
@@ -1150,6 +1154,12 @@
{
res[j++] = i;
}
+
+ if (ack)
+ {
+ // just to flush acks
+ consumers[consumerID].session.commit();
+ }
return res;
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-18
04:54:45 UTC (rev 10343)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-03-18
06:09:59 UTC (rev 10344)
@@ -107,7 +107,6 @@
MessageRedistributionTest.log.info("Test done");
}
- //
https://issues.jboss.org/browse/JBPAPP-6130
//
https://issues.jboss.org/browse/HORNETQ-654
public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
{
@@ -137,7 +136,7 @@
waitForBindings(1, "queues.testaddress", 2, 2, false);
waitForBindings(2, "queues.testaddress", 2, 2, false);
- send(0, "queues.testaddress", 9, true, null);
+ send(0, "queues.testaddress", 20, true, null);
getReceivedOrder(0, true);
int[] ids1 = getReceivedOrder(1, false);
@@ -157,6 +156,9 @@
}
removeConsumer(1);
+
+ // Need to wait some time as we need to handle all redistributions before we stop
the servers
+ Thread.sleep(5000);
for (int i = 0; i <= 2; i++)
{
@@ -181,16 +183,14 @@
setupSessionFactory(2, isNetty());
addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
- waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
waitForBindings(1, "queues.testaddress", 2, 2, false);
- waitForBindings(2, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);