[hornetq-commits] JBoss hornetq SVN: r11437 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 28 00:58:35 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-09-28 00:58:35 -0400 (Wed, 28 Sep 2011)
New Revision: 11437

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
HORNETQ-783 - fix on messages stuck on paging while clustering

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2011-09-28 01:35:42 UTC (rev 11436)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2011-09-28 04:58:35 UTC (rev 11437)
@@ -182,13 +182,13 @@
    {
       addRouteContextToMessage(message);
 
-      List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
+      List<Queue> durableQueuesOnContext = context.getDurableQueues(storeAndForwardQueue.getAddress());
 
       if (!durableQueuesOnContext.contains(storeAndForwardQueue))
       {
          // There can be many remote bindings for the same node, we only want to add the message once to
          // the s & f queue for that node
-         context.addQueue(address, storeAndForwardQueue);
+         context.addQueue(storeAndForwardQueue.getAddress(), storeAndForwardQueue);
       }
    }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-28 01:35:42 UTC (rev 11436)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-09-28 04:58:35 UTC (rev 11437)
@@ -19,6 +19,7 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
@@ -26,6 +27,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.impl.QueueImpl;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 
 /**
@@ -123,7 +125,7 @@
 
       removeConsumer(1);
 
-      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0,  2);
+      verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
 
       MessageRedistributionTest.log.info("Test done");
    }
@@ -274,68 +276,67 @@
       servers[0].getAddressSettingsRepository().addMatch("queue0", setting);
       servers[1].getAddressSettingsRepository().addMatch("queue0", setting);
       servers[1].getAddressSettingsRepository().addMatch("queues.testaddress", setting);
-      
+
       startServers(0);
-      
+
       setupSessionFactory(0, isNetty());
-      
+
       createQueue(0, "queues.testaddress", "queue0", null, false);
-      
+
       ClientSession session0 = sfs[0].createSession(false, false, false);
-      
+
       ClientProducer prod0 = session0.createProducer("queues.testaddress");
-      
-      for (int i = 0 ; i < 100; i++)
+
+      for (int i = 0; i < 100; i++)
       {
          ClientMessage msg = session0.createMessage(true);
          msg.putIntProperty("key", i);
-         
+
          byte[] bytes = new byte[24];
-         
+
          ByteBuffer bb = ByteBuffer.wrap(bytes);
-         
+
          bb.putLong((long)i);
-         
+
          msg.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
 
          prod0.send(msg);
-         
+
          session0.commit();
       }
-      
+
       session0.close();
-      
+
       session0 = sfs[0].createSession(true, false, false);
 
       ClientConsumer consumer0 = session0.createConsumer("queue0");
-      
+
       session0.start();
-      
+
       ArrayList<Xid> xids = new ArrayList<Xid>();
-      
-      for (int i = 0 ; i < 100; i++)
+
+      for (int i = 0; i < 100; i++)
       {
          Xid xid = newXID();
-         
+
          session0.start(xid, XAResource.TMNOFLAGS);
-         
+
          ClientMessage msg = consumer0.receive(5000);
-         
+
          msg.acknowledge();
-         
+
          session0.end(xid, XAResource.TMSUCCESS);
-         
+
          session0.prepare(xid);
-         
+
          xids.add(xid);
       }
-      
+
       session0.close();
-      
+
       sfs[0].close();
       sfs[0] = null;
-      
-      
+
       startServers(0, 1, 2);
 
       setupSessionFactory(0, isNetty());
@@ -344,11 +345,11 @@
 
       createQueue(1, "queues.testaddress", "queue0", null, false);
       createQueue(2, "queues.testaddress", "queue0", null, false);
-      
+
       ClientSession session1 = sfs[1].createSession(false, false);
       session1.start();
       ClientConsumer consumer1 = session1.createConsumer("queue0");
-      
+
       waitForBindings(0, "queues.testaddress", 1, 0, true);
       waitForBindings(1, "queues.testaddress", 1, 1, true);
       waitForBindings(2, "queues.testaddress", 1, 0, true);
@@ -356,24 +357,23 @@
       waitForBindings(0, "queues.testaddress", 2, 1, false);
       waitForBindings(1, "queues.testaddress", 2, 0, false);
       waitForBindings(2, "queues.testaddress", 2, 1, false);
-      
+
       session0 = sfs[0].createSession(true, false, false);
-      
-      for (Xid xid: xids)
+
+      for (Xid xid : xids)
       {
          session0.rollback(xid);
       }
-      
-      
-      for (int i = 0 ; i < 100; i++)
+
+      for (int i = 0; i < 100; i++)
       {
          ClientMessage msg = consumer1.receive(15000);
          assertNotNull(msg);
          msg.acknowledge();
       }
-      
+
       session1.commit();
-      
+
    }
 
    public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws Exception
@@ -834,6 +834,75 @@
       verifyNotReceive(0);
    }
 
+   public void testRedistributionWithPagingOnTarget() throws Exception
+   {
+      setupCluster(false);
+
+      AddressSettings as = new AddressSettings();
+      as.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      as.setPageSizeBytes(10000);
+      as.setMaxSizeBytes(20000);
+
+      getServer(0).getAddressSettingsRepository().addMatch("queues.*", as);
+      getServer(1).getAddressSettingsRepository().addMatch("queues.*", as);
+      getServer(2).getAddressSettingsRepository().addMatch("queues.*", as);
+
+      startServers(0);
+
+      startServers(1);
+
+      waitForTopology(getServer(0), 2);
+      waitForTopology(getServer(1), 2);
+
+      setupSessionFactory(0, isNetty());
+
+      setupSessionFactory(1, isNetty());
+      
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+
+      waitForBindings(1, "queues.testaddress", 1, 0, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, false);
+      
+      getServer(0).getPagingManager().getPageStore(new SimpleString("queue.testaddress")).startPaging();
+      
+      ClientSession session0 = sfs[0].createSession(true, true, 0);
+      ClientProducer producer0 = session0.createProducer("queues.testaddress");
+                                                   
+      ClientConsumer consumer0 = session0.createConsumer("queue0");
+      session0.start();
+      
+      
+      ClientSession session1 = sfs[1].createSession(true, true, 0);
+      ClientConsumer consumer1 = session1.createConsumer("queue0");
+      session1.start();
+      
+      
+      for (int i = 0 ; i < 10; i++)
+      {
+         ClientMessage msg = session0.createMessage(true);
+         msg.putIntProperty("i", i);
+         // send two identical messages so they are routed on the cluster 
+         producer0.send(msg);
+         producer0.send(msg);
+         
+         msg = consumer0.receive(5000);
+         assertNotNull(msg);
+         assertEquals(i, msg.getIntProperty("i").intValue());
+         // msg.acknowledge(); // -- do not ack message on consumer0, to make sure the message will be routed
+         
+         msg = consumer1.receive(5000);
+         assertNotNull(msg);
+         assertEquals(i, msg.getIntProperty("i").intValue());
+         msg.acknowledge();
+      }
+      
+      session0.close();
+      session1.close();
+   }
+
    protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception
    {
       setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2);



More information about the hornetq-commits mailing list