[hornetq-commits] JBoss hornetq SVN: r10344 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Mar 18 02:09:59 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-18 02:09:59 -0400 (Fri, 18 Mar 2011)
New Revision: 10344

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6130 / https://issues.jboss.org/browse/HORNETQ-654 - fixing Redistribution issue over restart 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-18 04:54:45 UTC (rev 10343)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-03-18 06:09:59 UTC (rev 10344)
@@ -674,6 +674,12 @@
 
    public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
    {
+
+      // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message
+      // arrived the target node
+      // as described on https://issues.jboss.org/browse/JBPAPP-6130
+      ServerMessage copyRedistribute = message.copy(storageManager.generateUniqueID());
+      
       Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress());
 
       boolean res = false;
@@ -682,12 +688,18 @@
       {
          RoutingContext context = new RoutingContextImpl(tx);
 
-         boolean routed = bindings.redistribute(message, originatingQueue, context);
+         boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);
 
          if (routed)
          {
-            processRoute(message, context, false);
+            if (message.isDurable())
+            {
+               storageManager.storeMessageTransactional(tx.getID(), copyRedistribute);
+               tx.setContainsPersistent();
+            }
 
+            processRoute(copyRedistribute, context, false);
+
             res = true;
          }
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-18 04:54:45 UTC (rev 10343)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-03-18 06:09:59 UTC (rev 10344)
@@ -404,7 +404,7 @@
          }
          catch (HornetQException e)
          {
-            log.warn("Unable to send message, will try again once bridge reconnects");
+            log.warn("Unable to send message, will try again once bridge reconnects", e);
 
             refs.remove(ref);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2011-03-18 04:54:45 UTC (rev 10343)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2011-03-18 06:09:59 UTC (rev 10344)
@@ -127,7 +127,7 @@
 
       if (routed)
       {
-         doRedistribute(reference, tx);
+         ackRedistribution(reference, tx);
 
          return HandleStatus.HANDLED;
       }
@@ -137,7 +137,7 @@
       }
    }
 
-   private void doRedistribute(final MessageReference reference, final Transaction tx) throws Exception
+   private void ackRedistribution(final MessageReference reference, final Transaction tx) throws Exception
    {
       reference.handled();
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-18 04:54:45 UTC (rev 10343)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-03-18 06:09:59 UTC (rev 10344)
@@ -1132,9 +1132,13 @@
       do
       {
          message = consumer.consumer.receive(500);
-
          if (message != null)
          {
+            if (ack)
+            {
+               message.acknowledge();
+            }
+
             int count = (Integer)message.getObjectProperty(ClusterTestBase.COUNT_PROP);
 
             ints.add(count);
@@ -1150,6 +1154,12 @@
       {
          res[j++] = i;
       }
+      
+      if (ack)
+      {
+         // just to flush acks
+         consumers[consumerID].session.commit();
+      }
 
       return res;
    }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-03-18 04:54:45 UTC (rev 10343)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java	2011-03-18 06:09:59 UTC (rev 10344)
@@ -107,7 +107,6 @@
       MessageRedistributionTest.log.info("Test done");
    }
 
-   // https://issues.jboss.org/browse/JBPAPP-6130
    // https://issues.jboss.org/browse/HORNETQ-654
    public void testRedistributionWhenConsumerIsClosedAndRestart() throws Exception
    {
@@ -137,7 +136,7 @@
       waitForBindings(1, "queues.testaddress", 2, 2, false);
       waitForBindings(2, "queues.testaddress", 2, 2, false);
 
-      send(0, "queues.testaddress", 9, true, null);
+      send(0, "queues.testaddress", 20, true, null);
 
       getReceivedOrder(0, true);
       int[] ids1 = getReceivedOrder(1, false);
@@ -157,6 +156,9 @@
       }
 
       removeConsumer(1);
+      
+      // Need to wait some time as we need to handle all redistributions before we stop the servers
+      Thread.sleep(5000);
 
       for (int i = 0; i <= 2; i++)
       {
@@ -181,16 +183,14 @@
       setupSessionFactory(2, isNetty());
 
       addConsumer(0, 0, "queue0", null);
-      addConsumer(1, 1, "queue0", null);
       addConsumer(2, 2, "queue0", null);
 
       waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
       waitForBindings(2, "queues.testaddress", 1, 1, true);
 
-      waitForBindings(0, "queues.testaddress", 2, 2, false);
+      waitForBindings(0, "queues.testaddress", 2, 1, false);
       waitForBindings(1, "queues.testaddress", 2, 2, false);
-      waitForBindings(2, "queues.testaddress", 2, 2, false);
+      waitForBindings(2, "queues.testaddress", 2, 1, false);
 
       verifyReceiveRoundRobinInSomeOrderWithCounts(false, ids1, 0, 2);
 



More information about the hornetq-commits mailing list