Author: clebert.suconic(a)jboss.com
Date: 2011-09-28 00:58:35 -0400 (Wed, 28 Sep 2011)
New Revision: 11437
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
HORNETQ-783 - fix on messages stuck on paging while clustering
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-09-28
01:35:42 UTC (rev 11436)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2011-09-28
04:58:35 UTC (rev 11437)
@@ -182,13 +182,13 @@
{
addRouteContextToMessage(message);
- List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
+ List<Queue> durableQueuesOnContext =
context.getDurableQueues(storeAndForwardQueue.getAddress());
if (!durableQueuesOnContext.contains(storeAndForwardQueue))
{
// There can be many remote bindings for the same node, we only want to add the
message once to
// the s & f queue for that node
- context.addQueue(address, storeAndForwardQueue);
+ context.addQueue(storeAndForwardQueue.getAddress(), storeAndForwardQueue);
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-28
01:35:42 UTC (rev 11436)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-09-28
04:58:35 UTC (rev 11437)
@@ -19,6 +19,7 @@
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
@@ -26,6 +27,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
/**
@@ -123,7 +125,7 @@
removeConsumer(1);
- verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
+ verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
MessageRedistributionTest.log.info("Test done");
}
@@ -274,68 +276,67 @@
servers[0].getAddressSettingsRepository().addMatch("queue0", setting);
servers[1].getAddressSettingsRepository().addMatch("queue0", setting);
servers[1].getAddressSettingsRepository().addMatch("queues.testaddress",
setting);
-
+
startServers(0);
-
+
setupSessionFactory(0, isNetty());
-
+
createQueue(0, "queues.testaddress", "queue0", null, false);
-
+
ClientSession session0 = sfs[0].createSession(false, false, false);
-
+
ClientProducer prod0 = session0.createProducer("queues.testaddress");
-
- for (int i = 0 ; i < 100; i++)
+
+ for (int i = 0; i < 100; i++)
{
ClientMessage msg = session0.createMessage(true);
msg.putIntProperty("key", i);
-
+
byte[] bytes = new byte[24];
-
+
ByteBuffer bb = ByteBuffer.wrap(bytes);
-
+
bb.putLong((long)i);
-
+
msg.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
prod0.send(msg);
-
+
session0.commit();
}
-
+
session0.close();
-
+
session0 = sfs[0].createSession(true, false, false);
ClientConsumer consumer0 = session0.createConsumer("queue0");
-
+
session0.start();
-
+
ArrayList<Xid> xids = new ArrayList<Xid>();
-
- for (int i = 0 ; i < 100; i++)
+
+ for (int i = 0; i < 100; i++)
{
Xid xid = newXID();
-
+
session0.start(xid, XAResource.TMNOFLAGS);
-
+
ClientMessage msg = consumer0.receive(5000);
-
+
msg.acknowledge();
-
+
session0.end(xid, XAResource.TMSUCCESS);
-
+
session0.prepare(xid);
-
+
xids.add(xid);
}
-
+
session0.close();
-
+
sfs[0].close();
sfs[0] = null;
-
-
+
startServers(0, 1, 2);
setupSessionFactory(0, isNetty());
@@ -344,11 +345,11 @@
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
-
+
ClientSession session1 = sfs[1].createSession(false, false);
session1.start();
ClientConsumer consumer1 = session1.createConsumer("queue0");
-
+
waitForBindings(0, "queues.testaddress", 1, 0, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 0, true);
@@ -356,24 +357,23 @@
waitForBindings(0, "queues.testaddress", 2, 1, false);
waitForBindings(1, "queues.testaddress", 2, 0, false);
waitForBindings(2, "queues.testaddress", 2, 1, false);
-
+
session0 = sfs[0].createSession(true, false, false);
-
- for (Xid xid: xids)
+
+ for (Xid xid : xids)
{
session0.rollback(xid);
}
-
-
- for (int i = 0 ; i < 100; i++)
+
+ for (int i = 0; i < 100; i++)
{
ClientMessage msg = consumer1.receive(15000);
assertNotNull(msg);
msg.acknowledge();
}
-
+
session1.commit();
-
+
}
public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws
Exception
@@ -834,6 +834,75 @@
verifyNotReceive(0);
}
+ public void testRedistributionWithPagingOnTarget() throws Exception
+ {
+ setupCluster(false);
+
+ AddressSettings as = new AddressSettings();
+ as.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ as.setPageSizeBytes(10000);
+ as.setMaxSizeBytes(20000);
+
+ getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+ getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+ getServer(2).getAddressSettingsRepository().addMatch("queues.*", as);
+
+ startServers(0);
+
+ startServers(1);
+
+ waitForTopology(getServer(0), 2);
+ waitForTopology(getServer(1), 2);
+
+ setupSessionFactory(0, isNetty());
+
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, false);
+
+ getServer(0).getPagingManager().getPageStore(new
SimpleString("queue.testaddress")).startPaging();
+
+ ClientSession session0 = sfs[0].createSession(true, true, 0);
+ ClientProducer producer0 =
session0.createProducer("queues.testaddress");
+
+ ClientConsumer consumer0 = session0.createConsumer("queue0");
+ session0.start();
+
+
+ ClientSession session1 = sfs[1].createSession(true, true, 0);
+ ClientConsumer consumer1 = session1.createConsumer("queue0");
+ session1.start();
+
+
+ for (int i = 0 ; i < 10; i++)
+ {
+ ClientMessage msg = session0.createMessage(true);
+ msg.putIntProperty("i", i);
+ // send two identical messages so they are routed on the cluster
+ producer0.send(msg);
+ producer0.send(msg);
+
+ msg = consumer0.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("i").intValue());
+ // msg.acknowledge(); // -- do not ack message on consumer0, to make sure the
message will be routed
+
+ msg = consumer1.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("i").intValue());
+ msg.acknowledge();
+ }
+
+ session0.close();
+ session1.close();
+ }
+
protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
{
setupClusterConnection("cluster0", "queues",
forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2);