Author: ataylor
Date: 2009-11-05 08:30:49 -0500 (Thu, 05 Nov 2009)
New Revision: 8222
Modified:
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
test tweaks + fix
Modified: trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-11-05
08:42:21 UTC (rev 8221)
+++
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-11-05
13:30:49 UTC (rev 8222)
@@ -175,7 +175,6 @@
{
SimpleString clusterName = (SimpleString)notification.getProperties()
.getProperty(ManagementHelper.HDR_CLUSTER_NAME);
- groupMap.remove(clusterName);
List<SimpleString> list = groupMap.remove(clusterName);
if (list != null)
{
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-11-05
08:42:21 UTC (rev 8221)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-11-05
13:30:49 UTC (rev 8222)
@@ -16,12 +16,15 @@
import java.util.concurrent.TimeUnit;
import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.NotificationListener;
+import org.hornetq.core.management.NotificationType;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.utils.SimpleString;
/**
@@ -49,7 +52,7 @@
startServers(0, 1, 2);
try
- {
+ {
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -100,7 +103,7 @@
setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1);
-
+
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2, 1);
startServers(0, 1, 2);
@@ -623,22 +626,52 @@
waitForBindings(1, "queues.testaddress", 2, 0, false);
waitForBindings(2, "queues.testaddress", 2, 1, false);
- sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ sendInRange(1, "queues.testaddress", 0, 10, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- verifyReceiveAllInRange(0, 10, 0);
+ verifyReceiveAllInRange(true, 0, 10, 0);
- stopServers(1);
+ closeAllConsumers();
- sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
- startServers(1);
+ final CountDownLatch latch = new CountDownLatch(4);
+ NotificationListener listener = new NotificationListener()
+ {
+ public void onNotification(Notification notification)
+ {
+ if(NotificationType.BINDING_REMOVED == notification.getType())
+ {
+
if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+ {
+ latch.countDown();
+ }
+ }
+ else if(NotificationType.BINDING_ADDED == notification.getType())
+ {
+
if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+ {
+ latch.countDown();
+ }
+ }
+ }
+ };
+ getServer(0).getManagementService().addNotificationListener(listener);
+ getServer(2).getManagementService().addNotificationListener(listener);
- addConsumer(1, 1, "queue0", null);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
+ stopServers(1);
+ startServers(1);
+ assertTrue("timed out waiting for bindings to be removed and added
back",latch.await(5, TimeUnit.SECONDS));
+ getServer(0).getManagementService().removeNotificationListener(listener);
+ getServer(2).getManagementService().removeNotificationListener(listener);
+ addConsumer(1, 2, "queue0", null);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 2, 1, false);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ sendInRange(2, "queues.testaddress", 10, 20, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAllInRange(10, 20, 1);
+
System.out.println("*****************************************************************************");
}
finally
@@ -697,17 +730,42 @@
closeAllConsumers();
sendInRange(2, "queues.testaddress", 10, 20, true,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+ final CountDownLatch latch = new CountDownLatch(4);
+ NotificationListener listener = new NotificationListener()
+ {
+ public void onNotification(Notification notification)
+ {
+ if(NotificationType.BINDING_REMOVED == notification.getType())
+ {
+
if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+ {
+ latch.countDown();
+ }
+ }
+ else if(NotificationType.BINDING_ADDED == notification.getType())
+ {
+
if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+ {
+ latch.countDown();
+ }
+ }
+ }
+ };
+ getServer(0).getManagementService().addNotificationListener(listener);
+ getServer(2).getManagementService().addNotificationListener(listener);
-
stopServers(1);
startServers(1);
-
+ assertTrue("timed out waiting for bindings to be removed and added
back",latch.await(5, TimeUnit.SECONDS));
+ getServer(0).getManagementService().removeNotificationListener(listener);
+ getServer(2).getManagementService().removeNotificationListener(listener);
addConsumer(1, 1, "queue0", null);
waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
-
verifyReceiveAllInRange(10, 20, 1);
@@ -766,17 +824,40 @@
sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAllInRange(0, 10, 0);
-
+ final CountDownLatch latch = new CountDownLatch(4);
+ NotificationListener listener = new NotificationListener()
+ {
+ public void onNotification(Notification notification)
+ {
+ if(NotificationType.BINDING_REMOVED == notification.getType())
+ {
+
if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+ {
+ latch.countDown();
+ }
+ }
+ else if(NotificationType.BINDING_ADDED == notification.getType())
+ {
+
if(notification.getProperties().getProperty(ManagementHelper.HDR_ADDRESS).toString().equals("queues.testaddress"))
+ {
+ latch.countDown();
+ }
+ }
+ }
+ };
+ getServer(0).getManagementService().addNotificationListener(listener);
+ getServer(2).getManagementService().addNotificationListener(listener);
stopServers(1);
-
startServers(1);
-
- sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
-
-
+ assertTrue("timed out waiting for bindings to be removed and added
back",latch.await(5, TimeUnit.SECONDS));
+ getServer(0).getManagementService().removeNotificationListener(listener);
+ getServer(2).getManagementService().removeNotificationListener(listener);
addConsumer(1, 1, "queue0", null);
waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAllInRange(10, 20, 1);
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-05
08:42:21 UTC (rev 8221)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-11-05
13:30:49 UTC (rev 8222)
@@ -65,12 +65,12 @@
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
-
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
@@ -168,11 +168,13 @@
createQueue(0, "queues.testaddress", "queue0", null, true);
createQueue(1, "queues.testaddress", "queue0", null, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);