JBoss hornetq SVN: r10440 - in branches/HORNETQ-316: src/main/org/hornetq/core/client/impl and 4 other directories.
by do-not-reply@jboss.org
Author: igarashitm
Date: 2011-04-03 05:50:09 -0400 (Sun, 03 Apr 2011)
New Revision: 10440
Added:
branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
Modified:
branches/HORNETQ-316/pom.xml
branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
Log:
added JGroupsDiscoveryTest & made it passed.
Modified: branches/HORNETQ-316/pom.xml
===================================================================
--- branches/HORNETQ-316/pom.xml 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/pom.xml 2011-04-03 09:50:09 UTC (rev 10440)
@@ -259,7 +259,7 @@
<dependency>
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.3</version>
+ <version>2.6.15.GA</version>
</dependency>
<!-- needed to compile the tests-->
<dependency>
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/client/impl/SimpleUDPServerLocatorImpl.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -86,8 +86,12 @@
{
this.localBindAddress = null;
}
-
- this.groupAddress = InetAddress.getByName(ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params));
+
+ String gaddr = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.GROUP_ADDRESS_NAME, null, params);
+ if(gaddr != null)
+ {
+ this.groupAddress = InetAddress.getByName(gaddr);
+ }
this.groupPort = ConfigurationHelper.getIntProperty(DiscoveryGroupConstants.GROUP_PORT_NAME, -1, params);
this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
Modified: branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -34,6 +34,7 @@
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -105,8 +106,8 @@
}
Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
- int localPort = Integer.parseInt((String)params.get(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME));
- String localAddr = (String)params.get(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME);
+ int localPort = ConfigurationHelper.getIntProperty(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME, -1, params);
+ String localAddr = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, null, params);
InetAddress localAddress = null;
if(localAddr!=null)
@@ -225,11 +226,10 @@
byte[] data = buff.toByteBuffer().array();
Map<String,Object> params = broadcastGroupConfiguration.getParams();
- int groupPort = Integer.parseInt((String)params.get(BroadcastGroupConstants.GROUP_PORT_NAME));
- String groupAddr = (String)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
- InetAddress groupAddress = InetAddress.getByName(groupAddr);
+ Integer groupPort = (Integer)params.get(BroadcastGroupConstants.GROUP_PORT_NAME);
+ InetAddress groupAddr = (InetAddress)params.get(BroadcastGroupConstants.GROUP_ADDRESS_NAME);
- DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
+ DatagramPacket packet = new DatagramPacket(data, data.length, groupAddr, groupPort);
socket.send(packet);
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -23,5 +23,8 @@
public class BroadcastGroupConstants
{
public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-file";
- public static final Object BROADCAST_PERIOD_NAME = "broadcast-period";
+ public static final String BROADCAST_PERIOD_NAME = "broadcast-period";
+ public static final String JGROUPS_CHANNEL_NAME_NAME = "jgroups-channel-name";
+
+ public static final String DEFAULT_JGROUPS_CHANNEL_NAME = "hornetq-jgroups-channel";
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -25,4 +25,7 @@
public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-filename";
public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+ public static final String JGROUPS_CHANNEL_NAME_NAME = "jgroups-channel-name";
+
+ public static final String DEFAULT_JGROUPS_CHANNEL_NAME = "hornetq-jgroups-channel";
}
Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -13,7 +13,6 @@
package org.hornetq.integration.discovery.jgroups;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@@ -53,10 +52,12 @@
private final BroadcastGroupConfiguration broadcastGroupConfiguration;
- private final List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ private final List<TransportConfiguration> connectors;
private String jgroupsConfigurationFileName;
+ private String jgroupsChannelName = null;
+
private JChannel broadcastChannel;
private boolean started;
@@ -84,6 +85,8 @@
this.broadcastGroupConfiguration = config;
+ this.connectors = config.getConnectorList();
+
uniqueID = UUIDGenerator.getInstance().generateStringUUID();
}
@@ -100,11 +103,11 @@
}
Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
- this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
-
+ this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+ this.jgroupsChannelName = ConfigurationHelper.getStringProperty(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, BroadcastGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
this.broadcastChannel = new JChannel(Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName));
- this.broadcastChannel.connect(this.name);
+ this.broadcastChannel.connect(this.jgroupsChannelName);
started = true;
@@ -127,7 +130,15 @@
if (future != null)
{
future.cancel(false);
+ future = null;
}
+
+ if (broadcastChannel != null)
+ {
+ broadcastChannel.shutdown();
+ broadcastChannel.close();
+ broadcastChannel = null;
+ }
started = false;
Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -51,6 +51,8 @@
private final String name;
+ private final String jgroupsChannelName;
+
private final URL configURL;
private final String nodeID;
@@ -73,11 +75,13 @@
public JGroupsDiscoveryGroupImpl(final String nodeID,
final String name,
+ final String channelName,
final URL confURL,
final long timeout)
{
this.nodeID = nodeID;
this.name = name;
+ this.jgroupsChannelName = channelName;
this.configURL = confURL;
this.timeout = timeout;
}
@@ -100,7 +104,7 @@
this.discoveryChannel.setReceiver(this);
- this.discoveryChannel.connect(this.name);
+ this.discoveryChannel.connect(this.jgroupsChannelName);
}
catch(Exception e)
{
@@ -141,6 +145,8 @@
this.discoveryChannel.shutdown();
+ this.discoveryChannel.close();
+
this.discoveryChannel = null;
if (notificationService != null)
@@ -164,6 +170,11 @@
return this.name;
}
+ public String getJGroupsChannelName()
+ {
+ return this.jgroupsChannelName;
+ }
+
public List<DiscoveryEntry> getDiscoveryEntries()
{
List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>();
Modified: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -50,6 +50,8 @@
private String jgroupsConfigurationFileName;
+ private String jgroupsChannelName;
+
private long initialWaitTimeout;
private long refreshTimeout;
@@ -69,6 +71,7 @@
Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+ this.jgroupsChannelName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CHANNEL_NAME_NAME, DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME, params);
this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
@@ -77,6 +80,7 @@
this.discoveryGroup = new JGroupsDiscoveryGroupImpl(getNodeID(),
this.discoveryGroupName,
+ this.jgroupsChannelName,
Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName),
this.refreshTimeout);
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml 2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml 2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_2"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_2.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
===================================================================
--- branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml (rev 0)
+++ branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml 2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,53 @@
+<config xmlns="urn:org:jgroups"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
+ <TCP loopback="true"
+ recv_buf_size="20000000"
+ send_buf_size="640000"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ enable_bundling="true"
+ use_send_queues="false"
+ sock_conn_timeout="300"
+ skip_suspected_members="true"
+
+ thread_pool.enabled="true"
+ thread_pool.min_threads="1"
+ thread_pool.max_threads="10"
+ thread_pool.keep_alive_time="5000"
+ thread_pool.queue_enabled="false"
+ thread_pool.queue_max_size="100"
+ thread_pool.rejection_policy="run"
+
+ oob_thread_pool.enabled="true"
+ oob_thread_pool.min_threads="1"
+ oob_thread_pool.max_threads="8"
+ oob_thread_pool.keep_alive_time="5000"
+ oob_thread_pool.queue_enabled="false"
+ oob_thread_pool.queue_max_size="100"
+ oob_thread_pool.rejection_policy="run"/>
+
+ <FILE_PING location="file_ping_dir_3"/>
+ <MERGE2 max_interval="30000"
+ min_interval="10000"/>
+ <FD_SOCK/>
+ <FD timeout="10000" max_tries="5" />
+ <VERIFY_SUSPECT timeout="1500" />
+ <BARRIER />
+ <pbcast.NAKACK
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200" />
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000"
+
+ view_bundling="true"/>
+ <FC max_credits="2000000"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" />
+ <pbcast.STREAMING_STATE_TRANSFER/>
+ <pbcast.FLUSH timeout="0"/>
+</config>
Property changes on: branches/HORNETQ-316/tests/config/test-jgroups-file_ping_3.xml
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2011-04-02 03:53:14 UTC (rev 10439)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -155,10 +155,10 @@
return;
}
- log.info("Local address is " + localAddress);
+ log.info("Local address is " + localAddress.getHostAddress());
Map<String,Object> params = new HashMap<String,Object>();
- params.put(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, localAddress);
+ params.put(BroadcastGroupConstants.LOCAL_BIND_ADDRESS_NAME, localAddress.getHostAddress());
params.put(BroadcastGroupConstants.LOCAL_BIND_PORT_NAME, 6552);
params.put(BroadcastGroupConstants.GROUP_ADDRESS_NAME, groupAddress);
params.put(BroadcastGroupConstants.GROUP_PORT_NAME, groupPort);
Added: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
===================================================================
--- branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java (rev 0)
+++ branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java 2011-04-03 09:50:09 UTC (rev 10440)
@@ -0,0 +1,719 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.discovery;
+
+ import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.integration.discovery.jgroups.BroadcastGroupConstants;
+import org.hornetq.integration.discovery.jgroups.DiscoveryGroupConstants;
+import org.hornetq.integration.discovery.jgroups.JGroupsBroadcastGroupImpl;
+import org.hornetq.integration.discovery.jgroups.JGroupsDiscoveryGroupImpl;
+import org.hornetq.tests.integration.SimpleNotificationService;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * A DiscoveryTest
+ *
+ * @author <a href="mailto:tm.igarashi@gmail.com">Tomohisa Igarashi</a>
+ *
+ */
+public class JGroupsDiscoveryTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(DiscoveryTest.class);
+
+ private static final String channelName = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME;
+
+ private static final String channelName2 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "2";
+
+ private static final String channelName3 = DiscoveryGroupConstants.DEFAULT_JGROUPS_CHANNEL_NAME + "3";
+
+ private static final String config = "test-jgroups-file_ping.xml";
+
+ private static final String config2 = "test-jgroups-file_ping_2.xml";
+
+ private static final String config3 = "test-jgroups-file_ping_3.xml";
+
+ public void testSimpleBroadcast() throws Exception
+ {
+ final long timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(10000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+ public void testSimpleBroadcastWithStopStartDiscoveryGroup() throws Exception
+ {
+ final int timeout = 500;
+
+ final String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+
+ dg.start();
+
+ bg.start();
+
+ bg.broadcastConnectors();
+
+ ok = dg.waitForBroadcast(1000);
+
+ Assert.assertTrue(ok);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testIgnoreTrafficFromOwnNode() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(nodeID,
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg.waitForBroadcast(1000);
+
+ Assert.assertFalse(ok);
+
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+
+ Assert.assertNotNull(entries);
+
+ Assert.assertEquals(0, entries.size());
+
+ bg.stop();
+
+ dg.stop();
+
+ }
+
+
+ public void testMultipleGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+
+ String node2 = RandomUtil.randomString();
+
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params1 = new HashMap<String,Object>();
+ params1.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params1.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params1, RandomUtil.randomString(), connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+ bg1.start();
+
+ Map<String,Object> params2 = new HashMap<String,Object>();
+ params2.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config2);
+ params2.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName2);
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params2, RandomUtil.randomString(), connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+ bg2.start();
+
+ Map<String,Object> params3 = new HashMap<String,Object>();
+ params3.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config3);
+ params3.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName3);
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params3, RandomUtil.randomString(), connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg1.start();
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName2,
+ Thread.currentThread().getContextClassLoader().getResource(config2),
+ timeout);
+ dg2.start();
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName3,
+ Thread.currentThread().getContextClassLoader().getResource(config3),
+ timeout);
+ dg3.start();
+
+ bg1.broadcastConnectors();
+
+ bg2.broadcastConnectors();
+
+ bg3.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live2), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live3), entries);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryListenersCalled() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+
+ bg.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ MyListener listener2 = new MyListener();
+ MyListener listener3 = new MyListener();
+
+ dg.registerListener(listener1);
+ dg.registerListener(listener2);
+ dg.registerListener(listener3);
+
+ dg.start();
+
+ bg.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ Assert.assertTrue(listener3.called);
+
+ listener1.called = false;
+ listener2.called = false;
+ listener3.called = false;
+
+ bg.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Won't be called since connectors haven't changed
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ Assert.assertFalse(listener3.called);
+
+ bg.stop();
+
+ dg.stop();
+ }
+
+ public void testConnectorsUpdatedMultipleBroadcasters() throws Exception
+ {
+ final int timeout = 500;
+
+ String node1 = RandomUtil.randomString();
+ String node2 = RandomUtil.randomString();
+ String node3 = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors1 = new ArrayList<TransportConfiguration>();
+ connectors1.add(live1);
+ BroadcastGroupConfiguration broadcastConf1 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors1);
+ BroadcastGroup bg1 = new JGroupsBroadcastGroupImpl(node1, broadcastConf1.getName(), true, broadcastConf1);
+ bg1.start();
+
+ TransportConfiguration live2 = generateTC();
+ List<TransportConfiguration> connectors2 = new ArrayList<TransportConfiguration>();
+ connectors2.add(live2);
+ BroadcastGroupConfiguration broadcastConf2 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors2);
+ BroadcastGroup bg2 = new JGroupsBroadcastGroupImpl(node2, broadcastConf2.getName(), true, broadcastConf2);
+ bg2.start();
+
+ TransportConfiguration live3 = generateTC();
+ List<TransportConfiguration> connectors3 = new ArrayList<TransportConfiguration>();
+ connectors3.add(live3);
+ BroadcastGroupConfiguration broadcastConf3 = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors3);
+ BroadcastGroup bg3 = new JGroupsBroadcastGroupImpl(node3, broadcastConf3.getName(), true, broadcastConf3);
+ bg3.start();
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ MyListener listener1 = new MyListener();
+ dg.registerListener(listener1);
+ MyListener listener2 = new MyListener();
+ dg.registerListener(listener2);
+
+ dg.start();
+
+ bg1.broadcastConnectors();
+ boolean ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg2.removeConnector(live2);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+
+ // Connector2 should still be there since not timed out yet
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live2, live3), entries);
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1, live3), entries);
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.removeConnector(live1);
+ bg3.removeConnector(live3);
+
+ Thread.sleep(timeout);
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertTrue(listener1.called);
+ Assert.assertTrue(listener2.called);
+ listener1.called = false;
+ listener2.called = false;
+
+ bg1.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg2.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+ bg3.broadcastConnectors();
+ ok = dg.waitForBroadcast(1000);
+
+ entries = dg.getDiscoveryEntries();
+ Assert.assertNotNull(entries);
+ Assert.assertEquals(0, entries.size());
+ Assert.assertFalse(listener1.called);
+ Assert.assertFalse(listener2.called);
+
+ bg1.stop();
+ bg2.stop();
+ bg3.stop();
+
+ dg.stop();
+ }
+
+ public void testMultipleDiscoveryGroups() throws Exception
+ {
+ final int timeout = 500;
+
+ String nodeID = RandomUtil.randomString();
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ params.put(BroadcastGroupConstants.JGROUPS_CHANNEL_NAME_NAME, channelName);
+ TransportConfiguration live1 = generateTC();
+ List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+ connectors.add(live1);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), connectors);
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(nodeID, broadcastConf.getName(), true, broadcastConf);
+ bg.start();
+
+ DiscoveryGroup dg1 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg2 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ DiscoveryGroup dg3 = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+
+ dg1.start();
+ dg2.start();
+ dg3.start();
+
+ bg.broadcastConnectors();
+
+ boolean ok = dg1.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ List<DiscoveryEntry> entries = dg1.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg2.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg2.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ ok = dg3.waitForBroadcast(1000);
+ Assert.assertTrue(ok);
+ entries = dg3.getDiscoveryEntries();
+ assertEqualsDiscoveryEntries(Arrays.asList(live1), entries);
+
+ bg.stop();
+
+ dg1.stop();
+ dg2.stop();
+ dg3.stop();
+ }
+
+ public void testDiscoveryGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ final int timeout = 500;
+
+ DiscoveryGroup dg = new JGroupsDiscoveryGroupImpl(RandomUtil.randomString(),
+ RandomUtil.randomString(),
+ channelName,
+ Thread.currentThread().getContextClassLoader().getResource(config),
+ timeout);
+ dg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ dg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+
+ dg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.DISCOVERY_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(dg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+ }
+
+ public void testBroadcastGroupNotifications() throws Exception
+ {
+ SimpleNotificationService notifService = new SimpleNotificationService();
+ SimpleNotificationService.Listener notifListener = new SimpleNotificationService.Listener();
+ notifService.addNotificationListener(notifListener);
+
+ Map<String,Object> params = new HashMap<String,Object>();
+ params.put(BroadcastGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, config);
+ BroadcastGroupConfiguration broadcastConf = new BroadcastGroupConfiguration(JGroupsBroadcastGroupImpl.class.getName(), params, RandomUtil.randomString(), new ArrayList<TransportConfiguration>());
+ BroadcastGroup bg = new JGroupsBroadcastGroupImpl(RandomUtil.randomString(), broadcastConf.getName(), true, broadcastConf);
+ bg.setNotificationService(notifService);
+
+ Assert.assertEquals(0, notifListener.getNotifications().size());
+
+ bg.start();
+
+ Assert.assertEquals(1, notifListener.getNotifications().size());
+ Notification notif = notifListener.getNotifications().get(0);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STARTED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+
+ bg.stop();
+
+ Assert.assertEquals(2, notifListener.getNotifications().size());
+ notif = notifListener.getNotifications().get(1);
+ Assert.assertEquals(NotificationType.BROADCAST_GROUP_STOPPED, notif.getType());
+ Assert.assertEquals(bg.getName(), notif.getProperties()
+ .getSimpleStringProperty(new SimpleString("name"))
+ .toString());
+ }
+
+ private TransportConfiguration generateTC()
+ {
+ String className = "org.foo.bar." + UUIDGenerator.getInstance().generateStringUUID();
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
+ params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
+ TransportConfiguration tc = new TransportConfiguration(className, params, name);
+ return tc;
+ }
+
+ private static class MyListener implements DiscoveryListener
+ {
+ volatile boolean called;
+
+ public void connectorsChanged()
+ {
+ called = true;
+ }
+ }
+
+
+ private static void assertEqualsDiscoveryEntries(List<TransportConfiguration> expected, List<DiscoveryEntry> actual)
+ {
+ assertNotNull(actual);
+
+ List<TransportConfiguration> sortedExpected = new ArrayList<TransportConfiguration>(expected);
+ Collections.sort(sortedExpected, new Comparator<TransportConfiguration>()
+ {
+
+ public int compare(TransportConfiguration o1, TransportConfiguration o2)
+ {
+ return o2.toString().compareTo(o1.toString());
+ }
+ });
+ List<DiscoveryEntry> sortedActual = new ArrayList<DiscoveryEntry>(actual);
+ Collections.sort(sortedActual, new Comparator<DiscoveryEntry>()
+ {
+ public int compare(DiscoveryEntry o1, DiscoveryEntry o2)
+ {
+ return o2.getConnector().toString().compareTo(o1.getConnector().toString());
+ }
+ });
+
+ assertEquals(sortedExpected.size(), sortedActual.size());
+ for (int i = 0; i < sortedExpected.size(); i++)
+ {
+ assertEquals(sortedExpected.get(i), sortedActual.get(i).getConnector());
+ }
+ }
+
+}
Property changes on: branches/HORNETQ-316/tests/src/org/hornetq/tests/integration/discovery/JGroupsDiscoveryTest.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
13 years, 11 months
JBoss hornetq SVN: r10439 - in trunk/src/main/org/hornetq/core: persistence/impl/journal and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-04-01 23:53:14 -0400 (Fri, 01 Apr 2011)
New Revision: 10439
Modified:
trunk/src/main/org/hornetq/core/paging/PrintPages.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Tool to Print Page Information - used to debug customer's data
Modified: trunk/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-01 22:16:46 UTC (rev 10438)
+++ trunk/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-02 03:53:14 UTC (rev 10439)
@@ -13,16 +13,33 @@
package org.hornetq.core.paging;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.paging.cursor.PagePosition;
+import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.CursorAckRecordEncoding;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -48,29 +65,36 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
- public static void main(String arg[])
+
+ public static void main(final String arg[])
{
+ if (arg.length != 2)
+ {
+ System.err.println("Usage: PrintPages <page foler> <journal folder>");
+ }
try
{
+
+ Map<Long, Set<PagePosition>> cursorACKs = PrintPages.loadCursorACKs(arg[1]);
+
ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
final ExecutorService executor = Executors.newFixedThreadPool(10);
ExecutorFactory execfactory = new ExecutorFactory()
{
-
+
public Executor getExecutor()
{
return executor;
}
};
PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false);
- HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+ HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
StorageManager sm = new NullStorageManager();
PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
-
+
manager.start();
-
+
SimpleString stores[] = manager.getStoreNames();
for (SimpleString store : stores)
@@ -79,28 +103,55 @@
System.out.println("Exploring store " + store);
PagingStore pgStore = manager.getPageStore(store);
int pgid = (int)pgStore.getFirstPage();
- for (int pg = 0 ; pg < pgStore.getNumberOfPages(); pg++)
+ for (int pg = 0; pg < pgStore.getNumberOfPages(); pg++)
{
System.out.println("******* Page " + pgid);
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read();
page.close();
-
+
int msgID = 0;
-
+
for (PagedMessage msg : msgs)
{
msg.initMessage(sm);
- System.out.println("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+ System.out.print("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+ System.out.print(",Queues = ");
+ long q[] = msg.getQueueIDs();
+ for (int i = 0; i < q.length; i++)
+ {
+ System.out.print(q[i]);
+
+ PagePosition posCheck = new PagePositionImpl(pgid, msgID);
+
+ boolean acked = false;
+
+ Set<PagePosition> positions = cursorACKs.get(q[i]);
+ if (positions != null)
+ {
+ acked = positions.contains(posCheck);
+ }
+
+ if (acked)
+ {
+ System.out.print(" (ACK)");
+ }
+
+ if (i + 1 < q.length)
+ {
+ System.out.print(",");
+ }
+ }
+ System.out.println();
msgID++;
}
-
- pgid ++;
-
+
+ pgid++;
+
}
}
-
+
}
catch (Exception e)
{
@@ -108,6 +159,60 @@
}
}
+ /**
+ * @param journalLocation
+ * @return
+ * @throws Exception
+ */
+ protected static Map<Long, Set<PagePosition>> loadCursorACKs(final String journalLocation) throws Exception
+ {
+ SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journalLocation);
+
+ // Will use only default values. The load function should adapt to anything different
+ ConfigurationImpl defaultValues = new ConfigurationImpl();
+
+ JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
+ defaultValues.getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
+
+ messagesJournal.start();
+
+ ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> txs = new ArrayList<PreparedTransactionInfo>();
+
+ messagesJournal.load(records, txs, null);
+
+ Map<Long, Set<PagePosition>> cursorRecords = new HashMap<Long, Set<PagePosition>>();
+
+ for (RecordInfo record : records)
+ {
+ if (record.userRecordType == JournalStorageManager.ACKNOWLEDGE_CURSOR)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+ CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+ encoding.decode(buff);
+
+ Set<PagePosition> set = cursorRecords.get(encoding.queueID);
+
+ if (set == null)
+ {
+ set = new HashSet<PagePosition>();
+ cursorRecords.put(encoding.queueID, set);
+ }
+
+ set.add(encoding.position);
+ }
+ }
+ return cursorRecords;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-01 22:16:46 UTC (rev 10438)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-02 03:53:14 UTC (rev 10439)
@@ -2696,7 +2696,7 @@
int deliveryCount;
}
- private static final class CursorAckRecordEncoding implements EncodingSupport
+ public static final class CursorAckRecordEncoding implements EncodingSupport
{
public CursorAckRecordEncoding(final long queueID, final PagePosition position)
{
@@ -2718,9 +2718,9 @@
return "CursorAckRecordEncoding [queueID=" + queueID + ", position=" + position + "]";
}
- long queueID;
+ public long queueID;
- PagePosition position;
+ public PagePosition position;
/* (non-Javadoc)
* @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
13 years, 11 months
JBoss hornetq SVN: r10438 - in trunk/src/main/org/hornetq/core: persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-04-01 18:16:46 -0400 (Fri, 01 Apr 2011)
New Revision: 10438
Added:
trunk/src/main/org/hornetq/core/paging/PrintPages.java
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
Log:
Adding tool to debug page files
Added: trunk/src/main/org/hornetq/core/paging/PrintPages.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PrintPages.java (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/PrintPages.java 2011-04-01 22:16:46 UTC (rev 10438)
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.utils.ExecutorFactory;
+
+/**
+ * A PrintPage
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PrintPages
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public static void main(String arg[])
+ {
+ try
+ {
+ ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);
+ final ExecutorService executor = Executors.newFixedThreadPool(10);
+ ExecutorFactory execfactory = new ExecutorFactory()
+ {
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+ PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(arg[0], 1000l, scheduled, execfactory, false);
+ HierarchicalRepository<AddressSettings> addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+ addressSettingsRepository.setDefault(new AddressSettings());
+ StorageManager sm = new NullStorageManager();
+ PagingManager manager = new PagingManagerImpl(pageStoreFactory, sm, addressSettingsRepository);
+
+ manager.start();
+
+ SimpleString stores[] = manager.getStoreNames();
+
+ for (SimpleString store : stores)
+ {
+ System.out.println("####################################################################################################");
+ System.out.println("Exploring store " + store);
+ PagingStore pgStore = manager.getPageStore(store);
+ int pgid = (int)pgStore.getFirstPage();
+ for (int pg = 0 ; pg < pgStore.getNumberOfPages(); pg++)
+ {
+ System.out.println("******* Page " + pgid);
+ Page page = pgStore.createPage(pgid);
+ page.open();
+ List<PagedMessage> msgs = page.read();
+ page.close();
+
+ int msgID = 0;
+
+ for (PagedMessage msg : msgs)
+ {
+ msg.initMessage(sm);
+ System.out.println("pg=" + pg + ", msg=" + msgID + "=" + msg.getMessage());
+ msgID++;
+ }
+
+ pgid ++;
+
+ }
+ }
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-03-31 21:20:51 UTC (rev 10437)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-04-01 22:16:46 UTC (rev 10438)
@@ -288,6 +288,13 @@
return file;
}
+ @Override
+ public String toString()
+ {
+ return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-03-31 21:20:51 UTC (rev 10437)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-04-01 22:16:46 UTC (rev 10438)
@@ -151,6 +151,13 @@
return getHeadersAndPropertiesEncodeSize();
}
+ @Override
+ public String toString()
+ {
+ return "LargeServerMessage[messageID=" + messageID + ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]";
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
13 years, 11 months