Author: igarashitm
Date: 2011-04-03 05:50:09 -0400 (Sun, 03 Apr 2011)
New Revision: 10440
Added:
branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
Modified:
branches/HORNETQ-316/pom.xml
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
Log:
added JGroupsDiscoveryTest & made it passed.
Modified: branches/HORNETQ-316/pom.xml
===================================================================
--- branches/HORNETQ-316/pom.xml 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/pom.xml 2011-04-03 09:50:09 UTC (rev 10440)
@@ -259,7 +259,7 @@
<dependency>
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.3</version>
+ <version>2.6.15.GA</version>
</dependency>
<!-- needed to compile the tests-->
<dependency>
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -86,8 +86,12 @@
{
this.localBindAddress = null;
}
-
- this.groupAddress =
InetAddress.getByName(ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME,
null, params));
+
+ String gaddr =
ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null,
params);
+ if(gaddr != null)
+ {
+ this.groupAddress = InetAddress.getByName(gaddr);
+ }
this.groupPort =
ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
this.refreshTimeout =
ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME,
ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
this.initialWaitTimeout =
ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME,
HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
Modified:
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -34,6 +34,7 @@
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -105,8 +106,8 @@
}
Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
- int localPort =
Integer.parseInt((String)params.get(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME));
- String localAddr =
(String)params.get(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME);
+ int localPort =
ConfigurationHelper.getIntProperty(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME, -1,
params);
+ String localAddr =
ConfigurationHelper.getStringProperty(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME,
null, params);
InetAddress localAddress = null;
if(localAddr!=null)
@@ -225,11 +226,10 @@
byte[] data = buff.toByteBuffer().array();
Map<String,Object> params = broadcastGroupConfiguration.getParams();
- int groupPort =
Integer.parseInt((String)params.get(BroadcastGroupConstants.GROUP_PORT_NAME));
- String groupAddr = (String)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
- InetAddress groupAddress = InetAddress.getByName(groupAddr);
+ Integer groupPort = (Integer)params.get(BroadcastGroupConstants.GROUP_PORT_NAME);
+ InetAddress groupAddr =
(InetAddress)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
- DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress,
groupPort);
+ DatagramPacket packet = new DatagramPacket(data, data.length, groupAddr,
groupPort);
socket.send(packet);
}
Modified:
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -23,5 +23,8 @@
public class BroadcastGroupConstants
{
public static final String JGROUPS_CONFIGURATION_FILE_NAME =
"jgroups-configuration-file";
- public static final Object BROADCAST_PERIOD_NAME = "broadcast-period";
+ public static final String BROADCAST_PERIOD_NAME = "broadcast-period";
+ public static final String JGROUPS_CHANNEL_NAME_NAME =
"jgroups-channel-name";
+
+ public static final String DEFAULT_JGROUPS_CHANNEL_NAME =
"hornetq-jgroups-channel";
}
Modified:
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -25,4 +25,7 @@
public static final String JGROUPS_CONFIGURATION_FILE_NAME =
"jgroups-configuration-filename";
public static final String INITIAL_WAIT_TIMEOUT_NAME =
"initial-wait-timeout";
public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+ public static final String JGROUPS_CHANNEL_NAME_NAME =
"jgroups-channel-name";
+
+ public static final String DEFAULT_JGROUPS_CHANNEL_NAME =
"hornetq-jgroups-channel";
}
Modified:
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -13,7 +13,6 @@
package org.hornetq.integration.discovery.jgroups;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@@ -53,10 +52,12 @@
private final BroadcastGroupConfiguration broadcastGroupConfiguration;
- private final List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
+ private final List<TransportConfiguration> connectors;
private String jgroupsConfigurationFileName;
+ private String jgroupsChannelName = null;
+
private JChannel broadcastChannel;
private boolean started;
@@ -84,6 +85,8 @@
this.broadcastGroupConfiguration = config;
+ this.connectors = config.getConnectorList();
+
uniqueID = UUIDGenerator.getInstance().generateStringUUID();
}
@@ -100,11 +103,11 @@
}
Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
- this.jgroupsConfigurationFileName =
ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME,
null, params);
-
+ this.jgroupsConfigurationFileName =
ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME,
null, params);
+ this.jgroupsChannelName =
ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME,
BroadcastGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
this.broadcastChannel = new
JChannel(Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName));
- this.broadcastChannel.connect(this.name);
+ this.broadcastChannel.connect(this.jgroupsChannelName);
started = true;
@@ -127,7 +130,15 @@
if (future != null)
{
future.cancel(false);
+ future = null;
}
+
+ if (broadcastChannel != null)
+ {
+ broadcastChannel.shutdown();
+ broadcastChannel.close();
+ broadcastChannel = null;
+ }
started = false;
Modified:
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -51,6 +51,8 @@
private final String name;
+ private final String jgroupsChannelName;
+
private final URL configURL;
private final String nodeID;
@@ -73,11 +75,13 @@
public JGroupsDiscoveryGroupImpl(final String nodeID,
final String name,
+ final String channelName,
final URL confURL,
final long timeout)
{
this.nodeID = nodeID;
this.name = name;
+ this.jgroupsChannelName = channelName;
this.configURL = confURL;
this.timeout = timeout;
}
@@ -100,7 +104,7 @@
this.discoveryChannel.setReceiver(this);
- this.discoveryChannel.connect(this.name);
+ this.discoveryChannel.connect(this.jgroupsChannelName);
}
catch(Exception e)
{
@@ -141,6 +145,8 @@
this.discoveryChannel.shutdown();
+ this.discoveryChannel.close();
+
this.discoveryChannel = null;
if (notificationService != null)
@@ -164,6 +170,11 @@
return this.name;
}
+ public String getJGroupsChannelName()
+ {
+ return this.jgroupsChannelName;
+ }
+
public List<DiscoveryEntry> getDiscoveryEntries()
{
List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>();
Modified:
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
===================================================================
---
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -50,6 +50,8 @@
private String jgroupsConfigurationFileName;
+ private String jgroupsChannelName;
+
private long initialWaitTimeout;
private long refreshTimeout;
@@ -69,6 +71,7 @@
Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+ this.jgroupsChannelName =
ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CHANNEL_NAME_NAME,
DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
this.initialWaitTimeout =
ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME,
HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
this.refreshTimeout =
ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME,
ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
@@ -77,6 +80,7 @@
this.discoveryGroup = new JGroupsDiscoveryGroupImpl(getNodeID(),
this.discoveryGroupName,
+ this.jgroupsChannelName,
Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName),
this.refreshTimeout);
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
(rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml 2011-04-03 09:50:09 UTC
(rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000"
desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
(rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml 2011-04-03 09:50:09 UTC
(rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_2"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000"
desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
(rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml 2011-04-03 09:50:09 UTC
(rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_3"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000"
desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified:
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
===================================================================
---
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2011-04-02
03:53:14 UTC (rev 10439)
+++
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -155,10 +155,10 @@
return;
}
- log.info("Local address is " + localAddress);
+ log.info("Local address is " + localAddress.getHostAddress());
Map<String,Object> params = new HashMap<String,Object>();
- params.put(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, localAddress);
+ params.put(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME,
localAddress.getHostAddress());
params.put(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME, 6552);
params.put(BroadcastGroupConstants.GROUP_ADDRESS_NAME, groupAddress);
params.put(BroadcastGroupConstants.GROUP_PORT_NAME, groupPort);
Added:
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
===================================================================
---
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
(rev 0)
+++
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java 2011-04-03
09:50:09 UTC (rev 10440)
@@ -0,0 +1,719 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.discovery;
+
+ import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.integration.discovery.jgroups.BroadcastGroupConstants;
+import org.hornetq.integration.discovery.jgroups.DiscoveryGroupConstants;
+import org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl;
+import org.hornetq.integration.discovery.jgroups.JGroupsDiscoveryGroupImpl;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A DiscoveryTest
+ *
+ * @author <a href="mailto:tm.igarashi@gmail.com">Tomohisa
Igarashi</a>
+ *
+ */
+public class JGroupsDiscoveryTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(DiscoveryTest.class);
+
+ private static final String channelName =
DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME;
+
+ private static final String channelName2 =
DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "2";
+
+ private static final String channelName3 =
DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "3";
+
+ private static final String config = "test-jgroups-file_ping.xml";
+
+ private static final String config2 = "test-jgroups-file_ping_2.xml";
+
+ private static final String config3 = "test-jgroups-file_ping_3.xml";
+
+ public void testSimpleBroadcast() throws Exception
+ {
+ final long timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(),
true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(10000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+ public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
+ {
+ final int timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(),
true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ dg.start();
+
+ bg.start();
+
+ bg.broadcastConnectors();
+
+ ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testIgnoreTrafficFromOwnNode() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(),
true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(nodeID,
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertFalse(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+
+ Assert.assertNotNull(entries);
+
+ Assert.assertEquals(0, entries.size());
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+
+ public void testMultipleGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+
+ String node2 = RandomUtil.randomString();
+
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params1 = new HashMap<String,Object>();
+ params1.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params1.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new
ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params1,
RandomUtil.randomString(), connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(),
true, broadcastConf1);
+ bg1.start();
+
+ Map<String,Object> params2 = new HashMap<String,Object>();
+ params2.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config2);
+ params2.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName2);
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new
ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params2,
RandomUtil.randomString(), connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(),
true, broadcastConf2);
+ bg2.start();
+
+ Map<String,Object> params3 = new HashMap<String,Object>();
+ params3.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config3);
+ params3.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName3);
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new
ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params3,
RandomUtil.randomString(), connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(),
true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg1.start();
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName2,
+
Thread.currentThread().getContextClassLoader().getResource(config2),
+ timeout);
+ dg2.start();
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName3,
+
Thread.currentThread().getContextClassLoader().getResource(config3),
+ timeout);
+ dg3.start();
+
+ bg1.broadcastConnectors();
+
+ bg2.broadcastConnectors();
+
+ bg3.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live2), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live3), entries);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryListenersCalled() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(),
true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ MyListener listener2 = new MyListener();
+ MyListener listener3 = new MyListener();
+
+ dg.registerListener(listener1);
+ dg.registerListener(listener2);
+ dg.registerListener(listener3);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ Assert.assertTrue(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Won't be called since connectors haven't changed
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ Assert.assertFalse(listener3.called);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+ String node2 = RandomUtil.randomString();
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new
ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(),
true, broadcastConf1);
+ bg1.start();
+
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new
ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(),
true, broadcastConf2);
+ bg2.start();
+
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new
ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(),
true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ dg.registerListener(listener1);
+ MyListener listener2 = new MyListener();
+ dg.registerListener(listener2);
+
+ dg.start();
+
+ bg1.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.removeConnector(live2);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Connector2 should still be there since not timed out yet
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.removeConnector(live1);
+ bg3.removeConnector(live3);
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg.stop();
+ }
+
+ public void testMultipleDiscoveryGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new
ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(),
true, broadcastConf);
+ bg.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg1.start();
+ dg2.start();
+ dg3.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new
SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ final int timeout = 500;
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+
Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ dg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new
SimpleString("name"))
+ .toString());
+
+ dg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new
SimpleString("name"))
+ .toString());
+ }
+
+ public void testBroadcastGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new
SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ BroadcastGroupConfiguration broadcastConf = new
BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params,
RandomUtil.randomString(), new ArrayList<TransportConfiguration>());
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(RandomUtil.randomString(),
broadcastConf.getName(), true, broadcastConf);
+ bg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ bg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new
SimpleString("name"))
+ .toString());
+
+ bg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new
SimpleString("name"))
+ .toString());
+ }
+
+ private TransportConfiguration generateTC()
+ {
+ String className = "org.foo.bar." +
UUIDGenerator.getInstance().generateStringUUID();
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
+ params.put(UUIDGenerator.getInstance().generateStringUUID(),
UUIDGenerator.getInstance().generateStringUUID());
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
+ TransportConfiguration tc = new TransportConfiguration(className, params, name);
+ return tc;
+ }
+
+ private static class MyListener implements DiscoveryListener
+ {
+ volatile boolean called;
+
+ public void connectorsChanged()
+ {
+ called = true;
+ }
+ }
+
+
+ private static void assertEqualsDiscoveryEntries(List<TransportConfiguration>
expected, List<DiscoveryEntry> actual)
+ {
+ assertNotNull(actual);
+
+ List<TransportConfiguration> sortedExpected = new
ArrayList<TransportConfiguration>(expected);
+ Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
+ {
+
+ public int compare(TransportConfiguration o1, TransportConfiguration o2)
+ {
+ return o2.toString().compareTo(o1.toString());
+ }
+ });
+ List<DiscoveryEntry> sortedActual = new
ArrayList<DiscoveryEntry>(actual);
+ Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
+ {
+ public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
+ {
+ return o2.getConnector().toString().compareTo(o1.getConnector().toString());
+ }
+ });
+
+ assertEquals(sortedExpected.size(), sortedActual.size());
+ for (int i = 0; i < sortedExpected.size(); i++)
+ {
+ assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
+ }
+ }
+
+}
Property changes on:
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain