[hornetq-commits] JBoss hornetq SVN: r10440 - in branches/HORNETQ-316: src/main/org/hornetq/core/client/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Apr 3 05:50:10 EDT 2011


Author: igarashitm
Date: 2011-04-03 05:50:09 -0400 (Sun, 03 Apr 2011)
New Revision: 10440

Added:
   branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
   branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
   branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
   branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
Modified:
   branches/HORNETQ-316/pom.xml
   branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
   branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
Log:
added JGroupsDiscoveryTest & made it passed.


Modified: branches/HORNETQ-316/pom.xml
===================================================================
--- branches/HORNETQ-316/pom.xml	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/pom.xml	2011-04-03 09:50:09 UTC (rev 10440)
@@ -259,7 +259,7 @@
       <dependency>
          <groupId>jgroups</groupId>
          <artifactId>jgroups</artifactId>
-         <version>2.3</version>
+         <version>2.6.15.GA</version>
       </dependency>
       <!-- needed to compile the tests-->
       <dependency>

Modified: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -86,8 +86,12 @@
          {
             this.localBindAddress = null;
          }
-
-         this.groupAddress = InetAddress.getByName(ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params));
+         
+         String gaddr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params);
+         if(gaddr != null)
+         {
+            this.groupAddress = InetAddress.getByName(gaddr);
+         }
          this.groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
          this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
          this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);

Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -34,6 +34,7 @@
 import org.hornetq.core.server.cluster.BroadcastGroup;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
 import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUIDGenerator;
 
@@ -105,8 +106,8 @@
       }
 
       Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
-      int localPort = Integer.parseInt((String)params.get(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME));
-      String localAddr = (String)params.get(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME);
+      int localPort = ConfigurationHelper.getIntProperty(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME, -1, params);
+      String localAddr = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
 
       InetAddress localAddress = null;
       if(localAddr!=null)
@@ -225,11 +226,10 @@
       byte[] data = buff.toByteBuffer().array();
 
       Map<String,Object> params = broadcastGroupConfiguration.getParams();
-      int groupPort = Integer.parseInt((String)params.get(BroadcastGroupConstants.GROUP_PORT_NAME));
-      String groupAddr = (String)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
-      InetAddress groupAddress = InetAddress.getByName(groupAddr);
+      Integer groupPort = (Integer)params.get(BroadcastGroupConstants.GROUP_PORT_NAME);
+      InetAddress groupAddr = (InetAddress)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
       
-      DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
+      DatagramPacket packet = new DatagramPacket(data, data.length, groupAddr, groupPort);
 
       socket.send(packet);
    }

Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -23,5 +23,8 @@
 public class BroadcastGroupConstants
 {
    public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-file";
-   public static final Object BROADCAST_PERIOD_NAME = "broadcast-period";
+   public static final String BROADCAST_PERIOD_NAME = "broadcast-period";
+   public static final String JGROUPS_CHANNEL_NAME_NAME = "jgroups-channel-name";
+   
+   public static final String DEFAULT_JGROUPS_CHANNEL_NAME = "hornetq-jgroups-channel";   
 }

Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -25,4 +25,7 @@
    public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-filename";
    public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
    public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+   public static final String JGROUPS_CHANNEL_NAME_NAME = "jgroups-channel-name";
+   
+   public static final String DEFAULT_JGROUPS_CHANNEL_NAME = "hornetq-jgroups-channel";
 }

Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -13,7 +13,6 @@
 
 package org.hornetq.integration.discovery.jgroups;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledExecutorService;
@@ -53,10 +52,12 @@
 
    private final BroadcastGroupConfiguration broadcastGroupConfiguration;
    
-   private final List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+   private final List<TransportConfiguration> connectors;
 
    private String jgroupsConfigurationFileName;
    
+   private String jgroupsChannelName = null;
+   
    private JChannel broadcastChannel;
    
    private boolean started;
@@ -84,6 +85,8 @@
 
       this.broadcastGroupConfiguration = config;
       
+      this.connectors = config.getConnectorList();
+      
       uniqueID = UUIDGenerator.getInstance().generateStringUUID();
    }
    
@@ -100,11 +103,11 @@
       }
 
       Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
-      this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
-
+      this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+      this.jgroupsChannelName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, BroadcastGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
       this.broadcastChannel = new JChannel(Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName));
       
-      this.broadcastChannel.connect(this.name);
+      this.broadcastChannel.connect(this.jgroupsChannelName);
       
       started = true;
 
@@ -127,7 +130,15 @@
       if (future != null)
       {
          future.cancel(false);
+         future = null;
       }
+      
+      if (broadcastChannel != null)
+      {
+         broadcastChannel.shutdown();
+         broadcastChannel.close();
+         broadcastChannel = null;
+      }
 
       started = false;
 

Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -51,6 +51,8 @@
 
    private final String name;
    
+   private final String jgroupsChannelName;
+
    private final URL configURL;
    
    private final String nodeID;
@@ -73,11 +75,13 @@
    
    public JGroupsDiscoveryGroupImpl(final String nodeID,
                                     final String name,
+                                    final String channelName,
                                     final URL confURL,
                                     final long timeout)
    {
       this.nodeID = nodeID;
       this.name = name;
+      this.jgroupsChannelName = channelName;
       this.configURL = confURL;
       this.timeout = timeout;
    }
@@ -100,7 +104,7 @@
 
          this.discoveryChannel.setReceiver(this);
          
-         this.discoveryChannel.connect(this.name);
+         this.discoveryChannel.connect(this.jgroupsChannelName);
       }
       catch(Exception e)
       {
@@ -141,6 +145,8 @@
 
       this.discoveryChannel.shutdown();
 
+      this.discoveryChannel.close();
+      
       this.discoveryChannel = null;
 
       if (notificationService != null)
@@ -164,6 +170,11 @@
       return this.name;
    }
 
+   public String getJGroupsChannelName()
+   {
+      return this.jgroupsChannelName;
+   }
+   
    public List<DiscoveryEntry> getDiscoveryEntries()
    {
       List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>();

Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -50,6 +50,8 @@
    
    private String jgroupsConfigurationFileName;
 
+   private String jgroupsChannelName;
+   
    private long initialWaitTimeout;
    
    private long refreshTimeout;
@@ -69,6 +71,7 @@
          
          Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
 
+         this.jgroupsChannelName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CHANNEL_NAME_NAME, DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
          this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
 
          this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
@@ -77,6 +80,7 @@
          
          this.discoveryGroup = new JGroupsDiscoveryGroupImpl(getNodeID(),
                                                              this.discoveryGroupName,
+                                                             this.jgroupsChannelName,
                                                              Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName),
                                                              this.refreshTimeout);
          

Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml	                        (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml	2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+    <TCP loopback="true"
+         recv_buf_size="20000000"
+         send_buf_size="640000"
+         discard_incompatible_packets="true"
+         max_bundle_size="64000"
+         max_bundle_timeout="30"
+         enable_bundling="true"
+         use_send_queues="false"
+         sock_conn_timeout="300"
+         skip_suspected_members="true"
+
+         thread_pool.enabled="true"
+         thread_pool.min_threads="1"
+         thread_pool.max_threads="10"
+         thread_pool.keep_alive_time="5000"
+         thread_pool.queue_enabled="false"
+         thread_pool.queue_max_size="100"
+         thread_pool.rejection_policy="run"
+
+         oob_thread_pool.enabled="true"
+         oob_thread_pool.min_threads="1"
+         oob_thread_pool.max_threads="8"
+         oob_thread_pool.keep_alive_time="5000"
+         oob_thread_pool.queue_enabled="false"
+         oob_thread_pool.queue_max_size="100"
+         oob_thread_pool.rejection_policy="run"/>
+
+    <FILE_PING location="file_ping_dir"/>
+    <MERGE2 max_interval="30000"
+              min_interval="10000"/>
+    <FD_SOCK/>
+    <FD timeout="10000" max_tries="5" />
+    <VERIFY_SUSPECT timeout="1500"  />
+    <BARRIER />
+    <pbcast.NAKACK
+                   use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+    <UNICAST timeout="300,600,1200" />
+    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+    <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+                view_bundling="true"/>
+    <FC max_credits="2000000"
+        min_threshold="0.10"/>
+    <FRAG2 frag_size="60000"  />
+    <pbcast.STREAMING_STATE_TRANSFER/>
+    <pbcast.FLUSH timeout="0"/>
+</config>


Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml	                        (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml	2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+    <TCP loopback="true"
+         recv_buf_size="20000000"
+         send_buf_size="640000"
+         discard_incompatible_packets="true"
+         max_bundle_size="64000"
+         max_bundle_timeout="30"
+         enable_bundling="true"
+         use_send_queues="false"
+         sock_conn_timeout="300"
+         skip_suspected_members="true"
+
+         thread_pool.enabled="true"
+         thread_pool.min_threads="1"
+         thread_pool.max_threads="10"
+         thread_pool.keep_alive_time="5000"
+         thread_pool.queue_enabled="false"
+         thread_pool.queue_max_size="100"
+         thread_pool.rejection_policy="run"
+
+         oob_thread_pool.enabled="true"
+         oob_thread_pool.min_threads="1"
+         oob_thread_pool.max_threads="8"
+         oob_thread_pool.keep_alive_time="5000"
+         oob_thread_pool.queue_enabled="false"
+         oob_thread_pool.queue_max_size="100"
+         oob_thread_pool.rejection_policy="run"/>
+
+    <FILE_PING location="file_ping_dir_2"/>
+    <MERGE2 max_interval="30000"
+              min_interval="10000"/>
+    <FD_SOCK/>
+    <FD timeout="10000" max_tries="5" />
+    <VERIFY_SUSPECT timeout="1500"  />
+    <BARRIER />
+    <pbcast.NAKACK
+                   use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+    <UNICAST timeout="300,600,1200" />
+    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+    <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+                view_bundling="true"/>
+    <FC max_credits="2000000"
+        min_threshold="0.10"/>
+    <FRAG2 frag_size="60000"  />
+    <pbcast.STREAMING_STATE_TRANSFER/>
+    <pbcast.FLUSH timeout="0"/>
+</config>


Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml	                        (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml	2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+    <TCP loopback="true"
+         recv_buf_size="20000000"
+         send_buf_size="640000"
+         discard_incompatible_packets="true"
+         max_bundle_size="64000"
+         max_bundle_timeout="30"
+         enable_bundling="true"
+         use_send_queues="false"
+         sock_conn_timeout="300"
+         skip_suspected_members="true"
+
+         thread_pool.enabled="true"
+         thread_pool.min_threads="1"
+         thread_pool.max_threads="10"
+         thread_pool.keep_alive_time="5000"
+         thread_pool.queue_enabled="false"
+         thread_pool.queue_max_size="100"
+         thread_pool.rejection_policy="run"
+
+         oob_thread_pool.enabled="true"
+         oob_thread_pool.min_threads="1"
+         oob_thread_pool.max_threads="8"
+         oob_thread_pool.keep_alive_time="5000"
+         oob_thread_pool.queue_enabled="false"
+         oob_thread_pool.queue_max_size="100"
+         oob_thread_pool.rejection_policy="run"/>
+
+    <FILE_PING location="file_ping_dir_3"/>
+    <MERGE2 max_interval="30000"
+              min_interval="10000"/>
+    <FD_SOCK/>
+    <FD timeout="10000" max_tries="5" />
+    <VERIFY_SUSPECT timeout="1500"  />
+    <BARRIER />
+    <pbcast.NAKACK
+                   use_mcast_xmit="false" gc_lag="0"
+                   retransmit_timeout="300,600,1200,2400,4800"
+                   discard_delivered_msgs="true"/>
+    <UNICAST timeout="300,600,1200" />
+    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+                   max_bytes="400000"/>
+    <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+                view_bundling="true"/>
+    <FC max_credits="2000000"
+        min_threshold="0.10"/>
+    <FRAG2 frag_size="60000"  />
+    <pbcast.STREAMING_STATE_TRANSFER/>
+    <pbcast.FLUSH timeout="0"/>
+</config>


Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
___________________________________________________________________
Added: svn:mime-type
   + text/plain

Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java	2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -155,10 +155,10 @@
          return;
       }
       
-      log.info("Local address is " + localAddress);
+      log.info("Local address is " + localAddress.getHostAddress());
 
       Map<String,Object> params = new HashMap<String,Object>();
-      params.put(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, localAddress);
+      params.put(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, localAddress.getHostAddress());
       params.put(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME, 6552);
       params.put(BroadcastGroupConstants.GROUP_ADDRESS_NAME, groupAddress);
       params.put(BroadcastGroupConstants.GROUP_PORT_NAME, groupPort);

Added: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java	                        (rev 0)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java	2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,719 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.discovery;
+
+ import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.integration.discovery.jgroups.BroadcastGroupConstants;
+import org.hornetq.integration.discovery.jgroups.DiscoveryGroupConstants;
+import org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl;
+import org.hornetq.integration.discovery.jgroups.JGroupsDiscoveryGroupImpl;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A DiscoveryTest
+ *
+ * @author <a href="mailto:tm.igarashi at gmail.com">Tomohisa Igarashi</a>
+ * 
+ */
+public class JGroupsDiscoveryTest extends UnitTestCase
+{
+   private static final Logger log = Logger.getLogger(DiscoveryTest.class);
+
+   private static final String channelName = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME;
+
+   private static final String channelName2 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "2";
+
+   private static final String channelName3 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "3";
+
+   private static final String config = "test-jgroups-file_ping.xml";
+
+   private static final String config2 = "test-jgroups-file_ping_2.xml";
+
+   private static final String config3 = "test-jgroups-file_ping_3.xml";
+
+   public void testSimpleBroadcast() throws Exception
+   {
+      final long timeout = 500;
+
+      final String nodeID = RandomUtil.randomString();
+
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+      TransportConfiguration live1 = generateTC();
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      connectors.add(live1);
+      BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+      BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+      bg.start();
+
+      DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                        RandomUtil.randomString(),
+                                                        channelName,
+                                                        Thread.currentThread().getContextClassLoader().getResource(config),
+                                                        timeout);
+
+      dg.start();
+
+      bg.broadcastConnectors();
+
+      boolean ok = dg.waitForBroadcast(10000);
+
+      Assert.assertTrue(ok);
+
+      List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+      bg.stop();
+
+      dg.stop();
+
+   }
+   
+   public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
+   {
+      final int timeout = 500;
+
+      final String nodeID = RandomUtil.randomString();
+
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+      TransportConfiguration live1 = generateTC();
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      connectors.add(live1);
+      BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+      BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+      bg.start();
+
+      DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                 RandomUtil.randomString(),
+                                                 channelName,
+                                                 Thread.currentThread().getContextClassLoader().getResource(config),
+                                                 timeout);
+
+      dg.start();
+
+      bg.broadcastConnectors();
+
+      boolean ok = dg.waitForBroadcast(1000);
+
+      Assert.assertTrue(ok);
+
+      List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+      bg.stop();
+
+      dg.stop();
+
+      dg.start();
+
+      bg.start();
+
+      bg.broadcastConnectors();
+
+      ok = dg.waitForBroadcast(1000);
+
+      Assert.assertTrue(ok);
+
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+      
+      bg.stop();
+      
+      dg.stop();
+   }
+
+   public void testIgnoreTrafficFromOwnNode() throws Exception
+   {
+      final int timeout = 500;
+
+      String nodeID = RandomUtil.randomString();
+
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+      TransportConfiguration live1 = generateTC();
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      connectors.add(live1);
+      BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+      BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+      bg.start();
+
+      DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(nodeID,
+                                                        RandomUtil.randomString(),
+                                                        channelName,
+                                                        Thread.currentThread().getContextClassLoader().getResource(config),
+                                                        timeout);
+
+      dg.start();
+
+      bg.broadcastConnectors();
+
+      boolean ok = dg.waitForBroadcast(1000);
+
+      Assert.assertFalse(ok);
+
+      List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+
+      Assert.assertNotNull(entries);
+
+      Assert.assertEquals(0, entries.size());
+
+      bg.stop();
+
+      dg.stop();
+
+   }
+
+
+   public void testMultipleGroups() throws Exception
+   {
+      final int timeout = 500;
+
+      String node1 = RandomUtil.randomString();
+
+      String node2 = RandomUtil.randomString();
+
+      String node3 = RandomUtil.randomString();
+
+      Map<String,Object> params1 = new HashMap<String,Object>();
+      params1.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      params1.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+      TransportConfiguration live1 = generateTC();
+      List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+      connectors1.add(live1);
+      BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params1, RandomUtil.randomString(), connectors1);
+      BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+      bg1.start();
+
+      Map<String,Object> params2 = new HashMap<String,Object>();
+      params2.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config2);
+      params2.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName2);
+      TransportConfiguration live2 = generateTC();
+      List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+      connectors2.add(live2);
+      BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params2, RandomUtil.randomString(), connectors2);
+      BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+      bg2.start();
+
+      Map<String,Object> params3 = new HashMap<String,Object>();
+      params3.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config3);
+      params3.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName3);
+      TransportConfiguration live3 = generateTC();
+      List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+      connectors3.add(live3);
+      BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params3, RandomUtil.randomString(), connectors3);
+      BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+      bg3.start();
+
+      DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                  RandomUtil.randomString(),
+                                                  channelName,
+                                                  Thread.currentThread().getContextClassLoader().getResource(config),
+                                                  timeout);
+      dg1.start();
+
+      DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                  RandomUtil.randomString(),
+                                                  channelName2,
+                                                  Thread.currentThread().getContextClassLoader().getResource(config2),
+                                                  timeout);
+      dg2.start();
+
+      DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                  RandomUtil.randomString(),
+                                                  channelName3,
+                                                  Thread.currentThread().getContextClassLoader().getResource(config3),
+                                                  timeout);
+      dg3.start();
+
+      bg1.broadcastConnectors();
+
+      bg2.broadcastConnectors();
+
+      bg3.broadcastConnectors();
+
+      boolean ok = dg1.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+      ok = dg2.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg2.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live2), entries);
+
+      ok = dg3.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg3.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live3), entries);
+
+      bg1.stop();
+      bg2.stop();
+      bg3.stop();
+
+      dg1.stop();
+      dg2.stop();
+      dg3.stop();
+   }
+
+   public void testDiscoveryListenersCalled() throws Exception
+   {
+      final int timeout = 500;
+
+      String nodeID = RandomUtil.randomString();
+
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+      TransportConfiguration live1 = generateTC();
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      connectors.add(live1);
+      BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+      BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+      bg.start();
+
+      DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                        RandomUtil.randomString(),
+                                                        channelName,
+                                                        Thread.currentThread().getContextClassLoader().getResource(config),
+                                                        timeout);
+
+      MyListener listener1 = new MyListener();
+      MyListener listener2 = new MyListener();
+      MyListener listener3 = new MyListener();
+
+      dg.registerListener(listener1);
+      dg.registerListener(listener2);
+      dg.registerListener(listener3);
+
+      dg.start();
+
+      bg.broadcastConnectors();
+      boolean ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+
+      Assert.assertTrue(listener1.called);
+      Assert.assertTrue(listener2.called);
+      Assert.assertTrue(listener3.called);
+
+      listener1.called = false;
+      listener2.called = false;
+      listener3.called = false;
+
+      bg.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+
+      // Won't be called since connectors haven't changed
+      Assert.assertFalse(listener1.called);
+      Assert.assertFalse(listener2.called);
+      Assert.assertFalse(listener3.called);
+
+      bg.stop();
+
+      dg.stop();
+   }
+
+   public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
+   {
+      final int timeout = 500;
+
+      String node1 = RandomUtil.randomString();
+      String node2 = RandomUtil.randomString();
+      String node3 = RandomUtil.randomString();
+
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+
+      TransportConfiguration live1 = generateTC();
+      List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+      connectors1.add(live1);
+      BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors1);
+      BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+      bg1.start();
+
+      TransportConfiguration live2 = generateTC();
+      List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+      connectors2.add(live2);
+      BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors2);
+      BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+      bg2.start();
+
+      TransportConfiguration live3 = generateTC();
+      List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+      connectors3.add(live3);
+      BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors3);
+      BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+      bg3.start();
+
+      DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                        RandomUtil.randomString(),
+                                                        channelName,
+                                                        Thread.currentThread().getContextClassLoader().getResource(config),
+                                                        timeout);
+
+      MyListener listener1 = new MyListener();
+      dg.registerListener(listener1);
+      MyListener listener2 = new MyListener();
+      dg.registerListener(listener2);
+
+      dg.start();
+
+      bg1.broadcastConnectors();
+      boolean ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+      Assert.assertTrue(listener1.called);
+      Assert.assertTrue(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg2.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1, live2), entries);
+      Assert.assertTrue(listener1.called);
+      Assert.assertTrue(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg3.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+      Assert.assertTrue(listener1.called);
+      Assert.assertTrue(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg1.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+      Assert.assertFalse(listener1.called);
+      Assert.assertFalse(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg2.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+      Assert.assertFalse(listener1.called);
+      Assert.assertFalse(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg3.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+      Assert.assertFalse(listener1.called);
+      Assert.assertFalse(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg2.removeConnector(live2);
+      bg2.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+
+      // Connector2 should still be there since not timed out yet
+
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+      Assert.assertFalse(listener1.called);
+      Assert.assertFalse(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      Thread.sleep(timeout);
+
+      bg1.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      bg2.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      bg3.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+
+      entries = dg.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1, live3), entries);
+      Assert.assertTrue(listener1.called);
+      Assert.assertTrue(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg1.removeConnector(live1);
+      bg3.removeConnector(live3);
+
+      Thread.sleep(timeout);
+
+      bg1.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      bg2.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      bg3.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+
+      entries = dg.getDiscoveryEntries();
+      Assert.assertNotNull(entries);
+      Assert.assertEquals(0, entries.size());
+      Assert.assertTrue(listener1.called);
+      Assert.assertTrue(listener2.called);
+      listener1.called = false;
+      listener2.called = false;
+
+      bg1.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      bg2.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+      bg3.broadcastConnectors();
+      ok = dg.waitForBroadcast(1000);
+
+      entries = dg.getDiscoveryEntries();
+      Assert.assertNotNull(entries);
+      Assert.assertEquals(0, entries.size());
+      Assert.assertFalse(listener1.called);
+      Assert.assertFalse(listener2.called);
+
+      bg1.stop();
+      bg2.stop();
+      bg3.stop();
+
+      dg.stop();
+   }
+
+   public void testMultipleDiscoveryGroups() throws Exception
+   {
+      final int timeout = 500;
+
+      String nodeID = RandomUtil.randomString();
+
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+      TransportConfiguration live1 = generateTC();
+      List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+      connectors.add(live1);
+      BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+      BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+      bg.start();
+
+      DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                         RandomUtil.randomString(),
+                                                         channelName,
+                                                         Thread.currentThread().getContextClassLoader().getResource(config),
+                                                         timeout);
+
+      DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                         RandomUtil.randomString(),
+                                                         channelName,
+                                                         Thread.currentThread().getContextClassLoader().getResource(config),
+                                                         timeout);
+
+      DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                         RandomUtil.randomString(),
+                                                         channelName,
+                                                         Thread.currentThread().getContextClassLoader().getResource(config),
+                                                         timeout);
+
+      dg1.start();
+      dg2.start();
+      dg3.start();
+
+      bg.broadcastConnectors();
+
+      boolean ok = dg1.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+      ok = dg2.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg2.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+      ok = dg3.waitForBroadcast(1000);
+      Assert.assertTrue(ok);
+      entries = dg3.getDiscoveryEntries();
+      assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+      bg.stop();
+
+      dg1.stop();
+      dg2.stop();
+      dg3.stop();
+   }
+
+   public void testDiscoveryGroupNotifications() throws Exception
+   {
+      SimpleNotificationService notifService = new SimpleNotificationService();
+      SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+      notifService.addNotificationListener(notifListener);
+
+      final int timeout = 500;
+
+      DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+                                                        RandomUtil.randomString(),
+                                                        channelName,
+                                                        Thread.currentThread().getContextClassLoader().getResource(config),
+                                                        timeout);
+      dg.setNotificationService(notifService);
+
+      Assert.assertEquals(0, notifListener.getNotifications().size());
+
+      dg.start();
+
+      Assert.assertEquals(1, notifListener.getNotifications().size());
+      Notification notif = notifListener.getNotifications().get(0);
+      Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STARTED, notif.getType());
+      Assert.assertEquals(dg.getName(), notif.getProperties()
+                                             .getSimpleStringProperty(new SimpleString("name"))
+                                             .toString());
+
+      dg.stop();
+
+      Assert.assertEquals(2, notifListener.getNotifications().size());
+      notif = notifListener.getNotifications().get(1);
+      Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STOPPED, notif.getType());
+      Assert.assertEquals(dg.getName(), notif.getProperties()
+                                             .getSimpleStringProperty(new SimpleString("name"))
+                                             .toString());
+   }
+
+   public void testBroadcastGroupNotifications() throws Exception
+   {
+      SimpleNotificationService notifService = new SimpleNotificationService();
+      SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+      notifService.addNotificationListener(notifListener);
+
+      Map<String,Object> params = new HashMap<String,Object>();
+      params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+      BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), new ArrayList<TransportConfiguration>());
+      BroadcastGroup bg = new JGroupsBroadcastGroupImpl(RandomUtil.randomString(), broadcastConf.getName(), true, broadcastConf);
+      bg.setNotificationService(notifService);
+
+      Assert.assertEquals(0, notifListener.getNotifications().size());
+
+      bg.start();
+
+      Assert.assertEquals(1, notifListener.getNotifications().size());
+      Notification notif = notifListener.getNotifications().get(0);
+      Assert.assertEquals(NotificationType.BROADCAST_GROUP_STARTED, notif.getType());
+      Assert.assertEquals(bg.getName(), notif.getProperties()
+                                             .getSimpleStringProperty(new SimpleString("name"))
+                                             .toString());
+
+      bg.stop();
+
+      Assert.assertEquals(2, notifListener.getNotifications().size());
+      notif = notifListener.getNotifications().get(1);
+      Assert.assertEquals(NotificationType.BROADCAST_GROUP_STOPPED, notif.getType());
+      Assert.assertEquals(bg.getName(), notif.getProperties()
+                                             .getSimpleStringProperty(new SimpleString("name"))
+                                             .toString());
+   }
+
+   private TransportConfiguration generateTC()
+   {
+      String className = "org.foo.bar." + UUIDGenerator.getInstance().generateStringUUID();
+      String name = UUIDGenerator.getInstance().generateStringUUID();
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
+      params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
+      params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
+      TransportConfiguration tc = new TransportConfiguration(className, params, name);
+      return tc;
+   }
+
+   private static class MyListener implements DiscoveryListener
+   {
+      volatile boolean called;
+
+      public void connectorsChanged()
+      {
+         called = true;
+      }
+   }
+   
+   
+   private static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
+   {
+      assertNotNull(actual);
+      
+      List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
+      Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
+      {
+
+         public int compare(TransportConfiguration o1, TransportConfiguration o2)
+         {
+            return o2.toString().compareTo(o1.toString());
+         }
+      });
+      List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
+      Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
+      {
+         public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
+         {
+            return o2.getConnector().toString().compareTo(o1.getConnector().toString());
+         }
+      });
+      
+      assertEquals(sortedExpected.size(), sortedActual.size());
+      for (int i = 0; i < sortedExpected.size(); i++)
+      {
+         assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
+      }
+   }
+
+}


Property changes on: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
___________________________________________________________________
Added: svn:mime-type
   + text/plain



More information about the hornetq-commits mailing list