[hornetq-commits] JBoss hornetq SVN: r11790 - in branches/HORNETQ-316: hornetq-core/src/main/java/org/hornetq/core/deployers/impl and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Nov 29 21:41:19 EST 2011
Author: igarashitm
Date: 2011-11-29 21:41:19 -0500 (Tue, 29 Nov 2011)
New Revision: 11790
Added:
branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
Modified:
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java
branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/HORNETQ-316/tests/integration-tests/pom.xml
Log:
https://issues.jboss.org/browse/HORNETQ-316
-Added testcases for JGroups Discovery
-Added JGroupsBroadcastGroupImpl and some fixes
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java 2011-11-30 00:18:43 UTC (rev 11789)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/config/BroadcastGroupConstants.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -23,6 +23,7 @@
public class BroadcastGroupConstants
{
// for simple UDP broadcast
+ public static final String UDP_BROADCAST_GROUP_CLASS = "org.hornetq.core.server.cluster.impl.BroadcastGroupImpl";
public static final String LOCAL_BIND_ADDRESS_NAME = "local-bind-address";
public static final String LOCAL_BIND_PORT_NAME = "local-bind-port";
public static final String GROUP_ADDRESS_NAME = "group-address";
Modified: branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-30 00:18:43 UTC (rev 11789)
+++ branches/HORNETQ-316/hornetq-core/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -32,6 +32,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.config.BroadcastGroupConstants;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.ConnectorServiceConfiguration;
@@ -925,7 +926,7 @@
switch (BroadcastType.valueOf(type))
{
case UDP:
- clazz = "org.hornetq.core.server.cluster.impl.BroadcastGroupImpl";
+ clazz = BroadcastGroupConstants.UDP_BROADCAST_GROUP_CLASS;
break;
case JGROUPS:
clazz = "org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl";
Added: branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java (rev 0)
+++ branches/HORNETQ-316/hornetq-jgroups-discovery/src/main/java/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,247 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+
+/**
+ * A JGroupsBroadcastGroupImpl
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsBroadcastGroupImpl implements BroadcastGroup, Runnable
+{
+ private static final Logger log = Logger.getLogger(JGroupsBroadcastGroupImpl.class);
+
+ private final String nodeID;
+
+ private final String name;
+
+ private final BroadcastGroupConfiguration broadcastGroupConfiguration;
+
+ private final List<TransportConfiguration> connectors;
+
+ private String jgroupsConfigurationFileName;
+
+ private String jgroupsChannelName = null;
+
+ private JChannel broadcastChannel;
+
+ private boolean started;
+
+ private ScheduledFuture<?> future;
+
+ private boolean active;
+
+ // Each broadcast group has a unique id - we use this to detect when more than one group broadcasts the same node id
+ // on the network which would be an error
+ private final String uniqueID;
+
+ private NotificationService notificationService;
+
+ public JGroupsBroadcastGroupImpl(final String nodeID,
+ final String name,
+ final boolean active,
+ final BroadcastGroupConfiguration config)
+ {
+ this.nodeID = nodeID;
+
+ this.name = name;
+
+ this.active = active;
+
+ this.broadcastGroupConfiguration = config;
+
+ this.connectors = config.getConnectorList();
+
+ uniqueID = UUIDGenerator.getInstance().generateStringUUID();
+ }
+
+ public void setNotificationService(NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
+
+ public void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
+ this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+ this.jgroupsChannelName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, BroadcastGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
+ this.broadcastChannel = new JChannel(Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName));
+
+ this.broadcastChannel.connect(this.jgroupsChannelName);
+
+ started = true;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STARTED, props);
+ notificationService.sendNotification(notification);
+ }
+ }
+
+ public void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (future != null)
+ {
+ future.cancel(false);
+ future = null;
+ }
+
+ if (broadcastChannel != null)
+ {
+ broadcastChannel.shutdown();
+ broadcastChannel.close();
+ broadcastChannel = null;
+ }
+
+ started = false;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ JGroupsBroadcastGroupImpl.log.warn("unable to send notification when broadcast group is stopped", e);
+ }
+ }
+
+ }
+
+ public boolean isStarted()
+ {
+ return this.started;
+ }
+
+ public String getName()
+ {
+ return this.name;
+ }
+
+ public void addConnector(TransportConfiguration tcConfig)
+ {
+ this.connectors.add(tcConfig);
+ }
+
+ public void removeConnector(TransportConfiguration tcConfig)
+ {
+ this.connectors.remove(tcConfig);
+ }
+
+ public int size()
+ {
+ return this.connectors.size();
+ }
+
+ public void activate()
+ {
+ this.active = true;
+ }
+
+ public void broadcastConnectors() throws Exception
+ {
+ if (!active)
+ {
+ return;
+ }
+
+ HornetQBuffer buff = HornetQBuffers.dynamicBuffer(4096);
+
+ buff.writeString(nodeID);
+
+ buff.writeString(uniqueID);
+
+ buff.writeInt(connectors.size());
+
+ for (TransportConfiguration tcConfig : connectors)
+ {
+ tcConfig.encode(buff);
+ }
+
+ byte[] data = buff.toByteBuffer().array();
+
+ Message msg = new Message();
+
+ msg.setBuffer(data);
+
+ this.broadcastChannel.send(msg);
+ }
+
+ public void run()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ try
+ {
+ broadcastConnectors();
+ }
+ catch (Exception e)
+ {
+ JGroupsBroadcastGroupImpl.log.error("Failed to broadcast connector configs", e);
+ }
+ }
+
+ public void schedule(ScheduledExecutorService scheduler)
+ {
+ Map<String,Object> params = broadcastGroupConfiguration.getParams();
+
+ this.future = scheduler.scheduleWithFixedDelay(this,
+ 0L,
+ Long.parseLong((String)params.get(BroadcastGroupConstants.BROADCAST_PERIOD_NAME)),
+ TimeUnit.MILLISECONDS);
+ }
+}
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_2"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_3"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Modified: branches/HORNETQ-316/tests/integration-tests/pom.xml
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/pom.xml 2011-11-30 00:18:43 UTC (rev 11789)
+++ branches/HORNETQ-316/tests/integration-tests/pom.xml 2011-11-30 02:41:19 UTC (rev 11790)
@@ -65,6 +65,11 @@
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
+ <artifactId>hornetq-jgroups-discovery</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hornetq</groupId>
<artifactId>hornetq-journal</artifactId>
<version>${project.version}</version>
</dependency>
Added: branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
===================================================================
--- branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java (rev 0)
+++ branches/HORNETQ-316/tests/integration-tests/src/test/java/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java 2011-11-30 02:41:19 UTC (rev 11790)
@@ -0,0 +1,719 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.discovery;
+
+ import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.integration.discovery.jgroups.BroadcastGroupConstants;
+import org.hornetq.integration.discovery.jgroups.DiscoveryGroupConstants;
+import org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl;
+import org.hornetq.integration.discovery.jgroups.JGroupsDiscoveryGroupImpl;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A DiscoveryTest
+ *
+ * @author <a href="mailto:tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ *
+ */
+public class JGroupsDiscoveryTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(DiscoveryTest.class);
+
+ private static final String channelName = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME;
+
+ private static final String channelName2 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "2";
+
+ private static final String channelName3 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "3";
+
+ private static final String config = "test-jgroups-file_ping.xml";
+
+ private static final String config2 = "test-jgroups-file_ping_2.xml";
+
+ private static final String config3 = "test-jgroups-file_ping_3.xml";
+
+ public void testSimpleBroadcast() throws Exception
+ {
+ final long timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(10000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+ public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
+ {
+ final int timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ dg.start();
+
+ bg.start();
+
+ bg.broadcastConnectors();
+
+ ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testIgnoreTrafficFromOwnNode() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(nodeID,
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertFalse(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+
+ Assert.assertNotNull(entries);
+
+ Assert.assertEquals(0, entries.size());
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+
+ public void testMultipleGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+
+ String node2 = RandomUtil.randomString();
+
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params1 = new HashMap<String,Object>();
+ params1.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params1.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params1, connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+ bg1.start();
+
+ Map<String,Object> params2 = new HashMap<String,Object>();
+ params2.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config2);
+ params2.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName2);
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params2, connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+ bg2.start();
+
+ Map<String,Object> params3 = new HashMap<String,Object>();
+ params3.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config3);
+ params3.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName3);
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params3, connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg1.start();
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName2,
+ Thread.currentThread().getContextClassLoader().getResource(config2),
+ timeout);
+ dg2.start();
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName3,
+ Thread.currentThread().getContextClassLoader().getResource(config3),
+ timeout);
+ dg3.start();
+
+ bg1.broadcastConnectors();
+
+ bg2.broadcastConnectors();
+
+ bg3.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live2), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live3), entries);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryListenersCalled() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ MyListener listener2 = new MyListener();
+ MyListener listener3 = new MyListener();
+
+ dg.registerListener(listener1);
+ dg.registerListener(listener2);
+ dg.registerListener(listener3);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ Assert.assertTrue(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Won't be called since connectors haven't changed
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ Assert.assertFalse(listener3.called);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+ String node2 = RandomUtil.randomString();
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+ bg1.start();
+
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+ bg2.start();
+
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ dg.registerListener(listener1);
+ MyListener listener2 = new MyListener();
+ dg.registerListener(listener2);
+
+ dg.start();
+
+ bg1.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.removeConnector(live2);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Connector2 should still be there since not timed out yet
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.removeConnector(live1);
+ bg3.removeConnector(live3);
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg.stop();
+ }
+
+ public void testMultipleDiscoveryGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+ bg.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg1.start();
+ dg2.start();
+ dg3.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ final int timeout = 500;
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ dg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+
+ dg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+ }
+
+ public void testBroadcastGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(RandomUtil.randomString(), JGroupsBroadcastGroupImpl.class.getName(), params, new ArrayList<TransportConfiguration>());
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(RandomUtil.randomString(), broadcastConf.getName(), true, broadcastConf);
+ bg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ bg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+
+ bg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+ }
+
+ private TransportConfiguration generateTC()
+ {
+ String className = "org.foo.bar." + UUIDGenerator.getInstance().generateStringUUID();
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
+ TransportConfiguration tc = new TransportConfiguration(className, params, name);
+ return tc;
+ }
+
+ private static class MyListener implements DiscoveryListener
+ {
+ volatile boolean called;
+
+ public void connectorsChanged()
+ {
+ called = true;
+ }
+ }
+
+
+ private static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
+ {
+ assertNotNull(actual);
+
+ List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
+ Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
+ {
+
+ public int compare(TransportConfiguration o1, TransportConfiguration o2)
+ {
+ return o2.toString().compareTo(o1.toString());
+ }
+ });
+ List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
+ Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
+ {
+ public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
+ {
+ return o2.getConnector().toString().compareTo(o1.getConnector().toString());
+ }
+ });
+
+ assertEquals(sortedExpected.size(), sortedActual.size());
+ for (int i = 0; i < sortedExpected.size(); i++)
+ {
+ assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
+ }
+ }
+
+}
More information about the hornetq-commits
mailing list