[hornetq-commits] JBoss hornetq SVN: r8222 - in trunk: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 5 08:30:49 EST 2009


Author: ataylor
Date: 2009-11-05 08:30:49 -0500 (Thu, 05 Nov 2009)
New Revision: 8222

Modified:
   trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
test tweaks + fix

Modified: trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-11-05 08:42:21 UTC (rev 8221)
+++ trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-11-05 13:30:49 UTC (rev 8222)
@@ -175,7 +175,6 @@
       {
          SimpleString clusterName = (SimpleString)notification.getProperties()
                                                               .getProperty(ManagementHelper.HDR_CLUSTER_NAME);
-         groupMap.remove(clusterName);
          List<SimpleString> list = groupMap.remove(clusterName);
          if (list != null)
          {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-11-05 08:42:21 UTC (rev 8221)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java	2009-11-05 13:30:49 UTC (rev 8222)
@@ -16,12 +16,15 @@
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationListener;
+import org.hornetq.core.management.NotificationType;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.server.group.impl.Proposal;
 import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.client.management.impl.ManagementHelper;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -49,7 +52,7 @@
       startServers(0, 1, 2);
 
       try
-      {                                                                                     
+      {
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
@@ -100,7 +103,7 @@
       setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
-      
+
       setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
 
       startServers(0, 1, 2);
@@ -623,22 +626,52 @@
          waitForBindings(1, "queues.testaddress", 2, 0, false);
          waitForBindings(2, "queues.testaddress", 2, 1, false);
 
-         sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         sendInRange(1, "queues.testaddress", 0, 10, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-         verifyReceiveAllInRange(0, 10, 0);
+         verifyReceiveAllInRange(true, 0, 10, 0);
 
-         stopServers(1);
+         closeAllConsumers();
 
-         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
-         startServers(1);
+         final CountDownLatch latch = new CountDownLatch(4);
+         NotificationListener listener = new NotificationListener()
+         {
+            public void onNotification(Notification notification)
+            {
+               if(NotificationType.BINDING_REMOVED == notification.getType())
+               {
+                  if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+                  {
+                     latch.countDown();
+                  }
+               }
+               else  if(NotificationType.BINDING_ADDED == notification.getType())
+               {
+                  if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+                  {
+                     latch.countDown();
+                  }
+               }
+            }
+         };
+         getServer(0).getManagementService().addNotificationListener(listener);
+         getServer(2).getManagementService().addNotificationListener(listener);
 
-         addConsumer(1, 1, "queue0", null);
-         waitForBindings(1, "queues.testaddress", 1, 1, true);
+         stopServers(1);
 
+         startServers(1);
+         assertTrue("timed out waiting for bindings to be removed and added back",latch.await(5, TimeUnit.SECONDS));
+         getServer(0).getManagementService().removeNotificationListener(listener);
+         getServer(2).getManagementService().removeNotificationListener(listener);
+         addConsumer(1, 2, "queue0", null);
 
+         waitForBindings(2, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 2, 1, false);
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
          verifyReceiveAllInRange(10, 20, 1);
 
+
          System.out.println("*****************************************************************************");
       }
       finally
@@ -697,17 +730,42 @@
          closeAllConsumers();
 
          sendInRange(2, "queues.testaddress", 10, 20, true, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+         final CountDownLatch latch = new CountDownLatch(4);
+         NotificationListener listener = new NotificationListener()
+         {
+            public void onNotification(Notification notification)
+            {
+               if(NotificationType.BINDING_REMOVED == notification.getType())
+               {
+                  if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+                  {
+                     latch.countDown();
+                  }
+               }
+               else  if(NotificationType.BINDING_ADDED == notification.getType())
+               {
+                  if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+                  {
+                     latch.countDown();
+                  }
+               }
+            }
+         };
+         getServer(0).getManagementService().addNotificationListener(listener);
+         getServer(2).getManagementService().addNotificationListener(listener);
 
-
          stopServers(1);
 
          startServers(1);
-
+         assertTrue("timed out waiting for bindings to be removed and added back",latch.await(5, TimeUnit.SECONDS));
+         getServer(0).getManagementService().removeNotificationListener(listener);
+         getServer(2).getManagementService().removeNotificationListener(listener);
          addConsumer(1, 1, "queue0", null);
 
          waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
 
-
          verifyReceiveAllInRange(10, 20, 1);
 
 
@@ -766,17 +824,40 @@
          sendInRange(1, "queues.testaddress", 0, 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
          verifyReceiveAllInRange(0, 10, 0);
-
+         final CountDownLatch latch = new CountDownLatch(4);
+         NotificationListener listener = new NotificationListener()
+         {
+            public void onNotification(Notification notification)
+            {
+               if(NotificationType.BINDING_REMOVED == notification.getType())
+               {
+                  if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+                  {
+                     latch.countDown();
+                  }
+               }
+               else  if(NotificationType.BINDING_ADDED == notification.getType())
+               {
+                  if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+                  {
+                     latch.countDown();
+                  }
+               }
+            }
+         };
+         getServer(0).getManagementService().addNotificationListener(listener);
+         getServer(2).getManagementService().addNotificationListener(listener);
          stopServers(1);
 
-
          startServers(1);
-
-         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
-
+         assertTrue("timed out waiting for bindings to be removed and added back",latch.await(5, TimeUnit.SECONDS));
+         getServer(0).getManagementService().removeNotificationListener(listener);
+         getServer(2).getManagementService().removeNotificationListener(listener);
          addConsumer(1, 1, "queue0", null);
          waitForBindings(1, "queues.testaddress", 1, 1, true);
+         waitForBindings(0, "queues.testaddress", 2, 1, false);
+         waitForBindings(2, "queues.testaddress", 2, 1, false);
+         sendInRange(2, "queues.testaddress", 10, 20, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
 
 
          verifyReceiveAllInRange(10, 20, 1);

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2009-11-05 08:42:21 UTC (rev 8221)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2009-11-05 13:30:49 UTC (rev 8222)
@@ -65,12 +65,12 @@
          createQueue(0, "queues.testaddress", "queue0", null, true);
          createQueue(1, "queues.testaddress", "queue0", null, true);
 
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         waitForBindings(1, "queues.testaddress", 1, 0, true);
+         
          addConsumer(0, 0, "queue0", null);
          addConsumer(1, 1, "queue0", null);
 
-         waitForBindings(0, "queues.testaddress", 1, 1, true);
-         waitForBindings(1, "queues.testaddress", 1, 1, true);
-
          waitForBindings(0, "queues.testaddress", 1, 1, false);
          waitForBindings(1, "queues.testaddress", 1, 1, false);
 
@@ -168,11 +168,13 @@
          createQueue(0, "queues.testaddress", "queue0", null, true);
          createQueue(1, "queues.testaddress", "queue0", null, true);
 
+
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         waitForBindings(1, "queues.testaddress", 1, 0, true);
+
          addConsumer(0, 0, "queue0", null);
          addConsumer(1, 1, "queue0", null);
 
-         waitForBindings(0, "queues.testaddress", 1, 1, true);
-         waitForBindings(1, "queues.testaddress", 1, 1, true);
 
          waitForBindings(0, "queues.testaddress", 1, 1, false);
          waitForBindings(1, "queues.testaddress", 1, 1, false);



More information about the hornetq-commits mailing list