[hornetq-commits] JBoss hornetq SVN: r10196 - in branches/HORNETQ-316: src/main/org/hornetq/integration and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Feb 10 05:25:47 EST 2011
Author: igarashitm
Date: 2011-02-10 05:25:47 -0500 (Thu, 10 Feb 2011)
New Revision: 10196
Added:
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
Modified:
branches/HORNETQ-316/build-hornetq.xml
branches/HORNETQ-316/pom.xml
branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
added jgroups discovery plugin
Modified: branches/HORNETQ-316/build-hornetq.xml
===================================================================
--- branches/HORNETQ-316/build-hornetq.xml 2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/build-hornetq.xml 2011-02-10 10:25:47 UTC (rev 10196)
@@ -69,6 +69,8 @@
<property name="spring.integration.sources.jar.name" value="hornetq-spring-integration-sources.jar"/>
<property name="twitter.integration.jar.name" value="hornetq-twitter-integration.jar"/>
<property name="twitter.integration.sources.jar.name" value="hornetq-twitter-integration-sources.jar"/>
+ <property name="jgroups.discovery.jar.name" value="hornetq-jgroups-discovery.jar"/>
+ <property name="jgroups.discovery.sources.jar.name" value="hornetq-jgroups-discovery.jar"/>
<property name="bootstrap.jar.name" value="hornetq-bootstrap.jar"/>
<property name="bootstrap.sources.jar.name" value="hornetq-bootstrap-sources.jar"/>
<property name="logging.jar.name" value="hornetq-logging.jar"/>
@@ -87,6 +89,7 @@
<property name="resources.jar.name" value="hornetq-resources.jar"/>
<property name="resources.sources.jar.name" value="hornetq-resources-sources.jar"/>
<property name="twitter4j.jar.name" value="twitter4j-core.jar"/>
+ <property name="jgroups.jar.name" value="jgroups.jar"/>
<property name="eap.examples.zip.name" value="hornetq-eap-examples.zip"/>
<!--source and build dirs-->
@@ -99,6 +102,7 @@
<property name="build.jboss.integration.classes.dir" value="${build.dir}/classes/jboss-integration"/>
<property name="build.spring.integration.classes.dir" value="${build.dir}/classes/spring-integration"/>
<property name="build.twitter.integration.classes.dir" value="${build.dir}/classes/twitter-integration"/>
+ <property name="build.jgroups.discovery.classes.dir" value="${build.dir}/classes/jgroups-discovery"/>
<property name="build.service.classes.dir" value="${build.dir}/classes/service"/>
<property name="build.bootstrap.classes.dir" value="${build.dir}/classes/bootstrap"/>
<property name="build.logging.classes.dir" value="${build.dir}/classes/logging"/>
@@ -223,6 +227,11 @@
<path refid="org.twitter4j.classpath"/>
</path>
+ <path id="jgroups.discovery.compilation.classpath">
+ <path location="${build.core.classes.dir}"/>
+ <path refid="jgroups.jgroups.classpath"/>
+ </path>
+
<path id="spring.integration.compilation.classpath">
<path location="${build.core.classes.dir}"/>
<path location="${build.jms.classes.dir}"/>
@@ -259,6 +268,7 @@
<path refid="bootstrap.compilation.classpath"/>
<path refid="junit.junit.classpath"/>
<path refid="org.twitter4j.classpath"/>
+ <path refid="jgroups.jgroups.classpath"/>
<path refid="org.springframework.classpath"/>
<path location="${build.jars.dir}/${ra.jar.name}"/>
<path location="${build.jars.dir}/${jms.jar.name}"/>
@@ -267,6 +277,7 @@
<path location="${build.jars.dir}/${logging.jar.name}"/>
<path location="${build.jars.dir}/${spring.integration.jar.name}"/>
<path location="${build.jars.dir}/${twitter.integration.jar.name}"/>
+ <path location="${build.jars.dir}/${jgroups.discovery.jar.name}"/>
</path>
<path id="jms.test.compilation.classpath">
@@ -321,6 +332,7 @@
<path refid="apache.logging.classpath"/>
<path refid="org.springframework.classpath"/>
<path refid="org.twitter4j.classpath"/>
+ <path refid="jgroups.jgroups.classpath"/>
</path>
<path id="emma.unit.test.execution.classpath">
@@ -403,6 +415,7 @@
<mkdir dir="${build.jboss.integration.classes.dir}"/>
<mkdir dir="${build.spring.integration.classes.dir}"/>
<mkdir dir="${build.twitter.integration.classes.dir}"/>
+ <mkdir dir="${build.jgroups.discovery.classes.dir}"/>
<mkdir dir="${build.service.classes.dir}"/>
<mkdir dir="${build.bootstrap.classes.dir}"/>
<mkdir dir="${build.logging.classes.dir}"/>
@@ -606,6 +619,27 @@
</javac>
</target>
+ <target name="compile-jgroups-discovery" depends="compile-core">
+ <javac destdir="${build.jgroups.discovery.classes.dir}"
+ target="${javac.target}"
+ source="${javac.source}"
+ optimize="${javac.optimize}"
+ debug="${javac.debug}"
+ depend="${javac.depend}"
+ verbose="${javac.verbose}"
+ deprecation="${javac.deprecation}"
+ includeAntRuntime="${javac.include.ant.runtime}"
+ includeJavaRuntime="${javac.include.java.runtime}"
+ encoding="${javac.encoding}"
+ failonerror="${javac.fail.onerror}">
+ <src>
+ <pathelement path="${src.main.dir}"/>
+ </src>
+ <include name="org/hornetq/integration/discovery/jgroups/**/*.java"/>
+ <classpath refid="jgroups.discovery.compilation.classpath"/>
+ </javac>
+ </target>
+
<target name="compile-spring-integration" depends="compile-core">
<javac destdir="${build.spring.integration.classes.dir}"
target="${javac.target}"
@@ -771,11 +805,11 @@
<!-- ======================================================================================== -->
<target name="sources-jar" description="create jar files containing source code"
- depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources, jar-spring-integration-sources">
+ depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources, jar-jgroups-discovery-sources, jar-spring-integration-sources">
</target>
<target name="jar"
- depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration, jar-spring-integration, jar-rest">
+ depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration, jar-jgroups-discovery, jar-spring-integration, jar-rest">
</target>
<target name="jar-jnp-client" depends="init">
@@ -935,6 +969,24 @@
</jar>
</target>
+ <target name="jar-jgroups-discovery" depends="compile-jgroups-discovery">
+
+ <jar jarfile="${build.jars.dir}/${jgroups.discovery.jar.name}">
+ <fileset dir="${build.jgroups.discovery.classes.dir}" includes="**"/>
+ </jar>
+
+ </target>
+
+ <target name="jar-jgroups-discovery-sources">
+
+ <jar jarfile="${build.jars.dir}/${jgroups.discovery.sources.jar.name}">
+ <fileset dir="${src.main.dir}">
+ <include name="org/hornetq/integration/discovery/jgroups/**/*.java"/>
+ </fileset>
+ </jar>
+
+ </target>
+
<target name="jar-spring-integration" depends="compile-spring-integration">
<jar jarfile="${build.jars.dir}/${spring.integration.jar.name}">
@@ -1267,6 +1319,7 @@
<include name="${jnp.client.jar.name}"/>
<include name="${spring.integration.jar.name}"/>
<include name="${twitter.integration.jar.name}"/>
+ <include name="${jgroups.discovery.jar.name}"/>
</fileset>
<fileset dir="${org.jboss.naming.lib}">
<include name="jnpserver.jar"/>
@@ -1277,6 +1330,7 @@
</copy>
<copy file="${org.jboss.netty.lib}/${netty.jar.name}" tofile="${build.distro.lib.dir}/netty.jar"/>
<copy file="${org.twitter4j.lib}/${twitter4j.jar.name}" tofile="${build.distro.lib.dir}/${twitter4j.jar.name}"/>
+ <copy file="${jgroups.jgroups.lib}/${jgroups.jar.name}" tofile="${build.distro.lib.dir}/${jgroups.jar.name}"/>
<copy todir="${build.distro.config.dir}">
<fileset dir="${src.config.dir}">
<include name="*.xml"/>
Modified: branches/HORNETQ-316/pom.xml
===================================================================
--- branches/HORNETQ-316/pom.xml 2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/pom.xml 2011-02-10 10:25:47 UTC (rev 10196)
@@ -255,6 +255,12 @@
<artifactId>twitter4j-core</artifactId>
<version>2.1.6</version>
</dependency>
+ <!-- needed to compile jgroups discovery support-->
+ <dependency>
+ <groupId>jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>2.3</version>
+ </dependency>
<!-- needed to compile the tests-->
<dependency>
<groupId>junit</groupId>
Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java 2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+/**
+ * A BroadcastGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class BroadcastGroupConstants
+{
+ public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-file";
+ public static final Object BROADCAST_PERIOD_NAME = "broadcast-period";
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java 2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class DiscoveryGroupConstants
+{
+ public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-filename";
+ public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+ public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}
Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java 2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+
+/**
+ * A JGroupsBroadcastGroupImpl
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsBroadcastGroupImpl implements BroadcastGroup, Runnable
+{
+ private static final Logger log = Logger.getLogger(JGroupsBroadcastGroupImpl.class);
+
+ private final String nodeID;
+
+ private final String name;
+
+ private final BroadcastGroupConfiguration broadcastGroupConfiguration;
+
+ private final List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+
+ private String jgroupsConfigurationFileName;
+
+ private JChannel broadcastChannel;
+
+ private boolean started;
+
+ private ScheduledFuture<?> future;
+
+ private boolean active;
+
+ // Each broadcast group has a unique id - we use this to detect when more than one group broadcasts the same node id
+ // on the network which would be an error
+ private final String uniqueID;
+
+ private NotificationService notificationService;
+
+ public JGroupsBroadcastGroupImpl(final String nodeID,
+ final String name,
+ final boolean active,
+ final BroadcastGroupConfiguration config)
+ {
+ this.nodeID = nodeID;
+
+ this.name = name;
+
+ this.active = active;
+
+ this.broadcastGroupConfiguration = config;
+
+ uniqueID = UUIDGenerator.getInstance().generateStringUUID();
+ }
+
+ public void setNotificationService(NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
+
+ public void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
+ this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+
+ this.broadcastChannel = new JChannel(Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName));
+
+ this.broadcastChannel.connect(this.name);
+
+ started = true;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STARTED, props);
+ notificationService.sendNotification(notification);
+ }
+ }
+
+ public void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ started = false;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ JGroupsBroadcastGroupImpl.log.warn("unable to send notification when broadcast group is stopped", e);
+ }
+ }
+
+ }
+
+ public boolean isStarted()
+ {
+ return this.started;
+ }
+
+ public String getName()
+ {
+ return this.name;
+ }
+
+ public void addConnector(TransportConfiguration tcConfig)
+ {
+ this.connectors.add(tcConfig);
+ }
+
+ public void removeConnector(TransportConfiguration tcConfig)
+ {
+ this.connectors.remove(tcConfig);
+ }
+
+ public int size()
+ {
+ return this.connectors.size();
+ }
+
+ public void activate()
+ {
+ this.active = true;
+ }
+
+ public void broadcastConnectors() throws Exception
+ {
+ if (!active)
+ {
+ return;
+ }
+
+ HornetQBuffer buff = HornetQBuffers.dynamicBuffer(4096);
+
+ buff.writeString(nodeID);
+
+ buff.writeString(uniqueID);
+
+ buff.writeInt(connectors.size());
+
+ for (TransportConfiguration tcConfig : connectors)
+ {
+ tcConfig.encode(buff);
+ }
+
+ byte[] data = buff.toByteBuffer().array();
+
+ Message msg = new Message();
+
+ msg.setBuffer(data);
+
+ this.broadcastChannel.send(msg);
+ }
+
+ public void run()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ try
+ {
+ broadcastConnectors();
+ }
+ catch (Exception e)
+ {
+ JGroupsBroadcastGroupImpl.log.error("Failed to broadcast connector configs", e);
+ }
+ }
+
+ public void schedule(ScheduledExecutorService scheduler)
+ {
+ Map<String,Object> params = broadcastGroupConfiguration.getParams();
+
+ this.future = scheduler.scheduleWithFixedDelay(this,
+ 0L,
+ Long.parseLong((String)params.get(BroadcastGroupConstants.BROADCAST_PERIOD_NAME)),
+ TimeUnit.MILLISECONDS);
+ }
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java 2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,358 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.TypedProperties;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.ReceiverAdapter;
+
+/**
+ * A JGroupsDiscoveryGroupImpl
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsDiscoveryGroupImpl extends ReceiverAdapter implements DiscoveryGroup
+{
+ private static final Logger log = Logger.getLogger(JGroupsDiscoveryGroupImpl.class);
+
+ private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+ private final String name;
+
+ private final URL configURL;
+
+ private final String nodeID;
+
+ private volatile boolean started;
+
+ private boolean received;
+
+ private final Object waitLock = new Object();
+
+ private final Map<String, DiscoveryEntry> connectors = new HashMap<String, DiscoveryEntry>();
+
+ private final long timeout;
+
+ private final Map<String, String> uniqueIDMap = new HashMap<String, String>();
+
+ private JChannel discoveryChannel;
+
+ private NotificationService notificationService;
+
+ public JGroupsDiscoveryGroupImpl(final String nodeID,
+ final String name,
+ final URL confURL,
+ final long timeout)
+ {
+ this.nodeID = nodeID;
+ this.name = name;
+ this.configURL = confURL;
+ this.timeout = timeout;
+ }
+
+ public void setNotificationService(NotificationService notificationService)
+ {
+ this.notificationService = notificationService;
+ }
+
+ public void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ try
+ {
+ this.discoveryChannel = new JChannel(configURL);
+
+ this.discoveryChannel.setReceiver(this);
+
+ this.discoveryChannel.connect(this.name);
+ }
+ catch(Exception e)
+ {
+ log.error("Failed to join jgroups channel", e);
+ return;
+ }
+
+ started = true;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+
+ Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props);
+
+ notificationService.sendNotification(notification);
+ }
+ }
+
+ public void stop() throws Exception
+ {
+ synchronized (this)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ started = false;
+ }
+
+ synchronized (waitLock)
+ {
+ waitLock.notify();
+ }
+
+ this.discoveryChannel.shutdown();
+
+ this.discoveryChannel = null;
+
+ if (notificationService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+ Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STOPPED, props);
+ try
+ {
+ notificationService.sendNotification(notification);
+ }
+ catch (Exception e)
+ {
+ JGroupsDiscoveryGroupImpl.log.warn("unable to send notification when discovery group is stopped", e);
+ }
+ }
+ }
+
+ public String getName()
+ {
+ return this.name;
+ }
+
+ public List<DiscoveryEntry> getDiscoveryEntries()
+ {
+ List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>();
+
+ list.addAll(connectors.values());
+
+ return list;
+ }
+
+ public boolean isStarted()
+ {
+ return this.started;
+ }
+
+ public boolean waitForBroadcast(long timeout)
+ {
+ synchronized (waitLock)
+ {
+ long start = System.currentTimeMillis();
+
+ long toWait = timeout;
+
+ while (started && !received && (toWait > 0 || timeout == 0))
+ {
+ try
+ {
+ waitLock.wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ if (timeout != 0)
+ {
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+ }
+
+ boolean ret = received;
+
+ received = false;
+
+ return ret;
+ }
+ }
+
+ @Override
+ public void receive(Message msg)
+ {
+ if(!started)
+ {
+ return;
+ }
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(msg.getBuffer());
+
+ String originatingNodeID = buffer.readString();
+
+ String uniqueID = buffer.readString();
+
+ checkUniqueID(originatingNodeID, uniqueID);
+
+ if (nodeID.equals(originatingNodeID))
+ {
+ if (checkExpiration())
+ {
+ callListeners();
+ }
+
+ // Ignore traffic from own node
+ return;
+ }
+
+ int size = buffer.readInt();
+
+ boolean changed = false;
+
+ synchronized (this)
+ {
+ for (int i = 0; i < size; i++)
+ {
+ TransportConfiguration connector = new TransportConfiguration();
+
+ connector.decode(buffer);
+
+ DiscoveryEntry entry = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
+
+ DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
+
+ if (oldVal == null)
+ {
+ changed = true;
+ }
+ }
+
+ changed = changed || checkExpiration();
+ }
+
+ if (changed)
+ {
+ callListeners();
+ }
+
+ synchronized (waitLock)
+ {
+ received = true;
+
+ waitLock.notify();
+ }
+ }
+
+ public void registerListener(DiscoveryListener listener)
+ {
+ listeners.add(listener);
+
+ if (!connectors.isEmpty())
+ {
+ listener.connectorsChanged();
+ }
+ }
+
+ public void unregisterListener(DiscoveryListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ private void callListeners()
+ {
+ for (DiscoveryListener listener : listeners)
+ {
+ try
+ {
+ listener.connectorsChanged();
+ }
+ catch (Throwable t)
+ {
+ // Catch it so exception doesn't prevent other listeners from running
+ JGroupsDiscoveryGroupImpl.log.error("Failed to call discovery listener", t);
+ }
+ }
+ }
+
+ private void checkUniqueID(final String originatingNodeID, final String uniqueID)
+ {
+ String currentUniqueID = uniqueIDMap.get(originatingNodeID);
+
+ if (currentUniqueID == null)
+ {
+ uniqueIDMap.put(originatingNodeID, uniqueID);
+ }
+ else
+ {
+ if (!currentUniqueID.equals(uniqueID))
+ {
+ log.warn("There are more than one servers on the network broadcasting the same node id. " + "You will see this message exactly once (per node) if a node is restarted, in which case it can be safely "
+ + "ignored. But if it is logged continuously it means you really do have more than one node on the same network "
+ + "active concurrently with the same node id. This could occur if you have a backup node active at the same time as "
+ + "its live node. nodeID=" + originatingNodeID);
+ uniqueIDMap.put(originatingNodeID, uniqueID);
+ }
+ }
+ }
+
+ private boolean checkExpiration()
+ {
+ boolean changed = false;
+ long now = System.currentTimeMillis();
+
+ Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
+
+ // Weed out any expired connectors
+
+ while (iter.hasNext())
+ {
+ Map.Entry<String, DiscoveryEntry> entry = iter.next();
+
+ if (entry.getValue().getLastUpdate() + timeout <= now)
+ {
+ iter.remove();
+
+ changed = true;
+ }
+ }
+
+ return changed;
+ }
+
+}
Property changes on: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java 2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,333 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.AbstractServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A JGroupsServerLocatorImpl
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsServerLocatorImpl extends AbstractServerLocator implements DiscoveryListener
+{
+ private static final long serialVersionUID = 1720602999991968346L;
+
+ private static final Logger log = Logger.getLogger(JGroupsServerLocatorImpl.class);
+
+ private String discoveryGroupName;
+
+ private String jgroupsConfigurationFileName;
+
+ private long initialWaitTimeout;
+
+ private long refreshTimeout;
+
+ private DiscoveryGroup discoveryGroup;
+
+ private volatile boolean closing;
+
+ private synchronized void initialise() throws Exception {
+ if (!isReadOnly())
+ {
+ setThreadPools();
+
+ instantiateLoadBalancingPolicy();
+
+ this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+
+ Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+ this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+
+ this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+
+ this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+
+ this.discoveryGroup = new JGroupsDiscoveryGroupImpl(getNodeID(),
+ this.discoveryGroupName,
+ Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName),
+ this.refreshTimeout);
+
+ this.discoveryGroup.registerListener(this);
+
+ this.discoveryGroup.start();
+
+ setReadOnly(true);
+ }
+ }
+
+ public JGroupsServerLocatorImpl(boolean useHA, DiscoveryGroupConfiguration discoveryGroupConfiguration)
+ {
+ super(useHA, discoveryGroupConfiguration);
+ }
+
+ public void start(Executor executor) throws Exception
+ {
+ initialise();
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ if (!closing)
+ {
+ log.warn("did not connect the cluster connection to other nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ public ClientSessionFactory connect() throws Exception
+ {
+ ClientSessionFactoryInternal sf;
+
+ // wait for discovery group to get the list of initial connectors
+ sf = (ClientSessionFactoryInternal)createSessionFactory();
+
+ addFactory(sf);
+ return sf;
+ }
+
+ public ClientSessionFactory createSessionFactory() throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ if (getInitialConnectors() == null)
+ {
+ // Wait for an initial broadcast to give us at least one node in the cluster
+ long timeout = isClusterConnection() ? 0 : this.initialWaitTimeout;
+ boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+ if (!ok)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive initial broadcast from cluster");
+ }
+ }
+
+ ClientSessionFactoryInternal factory = null;
+
+ synchronized (this)
+ {
+ boolean retry;
+ int attempts = 0;
+ do
+ {
+ retry = false;
+
+ TransportConfiguration tc = selectConnector();
+
+ // try each factory in the list until we find one which works
+
+ try
+ {
+ factory = new ClientSessionFactoryImpl(this,
+ tc,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+ factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+ }
+ catch (HornetQException e)
+ {
+ factory.close();
+ factory = null;
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ attempts++;
+
+ if (attempts == getConnectorLength())
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED,
+ "Cannot connect to server(s). Tried with all available servers.");
+ }
+ retry = true;
+ }
+ else
+ {
+ throw e;
+ }
+ }
+ }
+ while (retry);
+
+ if (isHA())
+ {
+ long toWait = 30000;
+ long start = System.currentTimeMillis();
+ while (!isReceivedTopology() && toWait > 0)
+ {
+ // Now wait for the topology
+
+ try
+ {
+ wait(toWait);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ if (toWait <= 0)
+ {
+ throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+ "Timed out waiting to receive cluster topology");
+ }
+ }
+
+ addFactory(factory);
+
+ return factory;
+ }
+ }
+
+ public ClientSessionFactory createSessionFactory(TransportConfiguration transportConfiguration) throws Exception
+ {
+ if (isClosed())
+ {
+ throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+ transportConfiguration,
+ getCallTimeout(),
+ getClientFailureCheckPeriod(),
+ getConnectionTTL(),
+ getRetryInterval(),
+ getRetryIntervalMultiplier(),
+ getMaxRetryInterval(),
+ getReconnectAttempts(),
+ getThreadPool(),
+ getScheduledThreadPool(),
+ getInterceptors());
+
+ factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+ addFactory(factory);
+
+ return factory;
+ }
+
+ public void close()
+ {
+ if (isClosed())
+ {
+ return;
+ }
+
+ closing = true;
+
+ try
+ {
+ this.discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
+
+ super.close();
+ }
+
+ public synchronized void connectorsChanged()
+ {
+ List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+ TransportConfiguration[] initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+ newConnectors.size());
+ int count = 0;
+ for (DiscoveryEntry entry : newConnectors)
+ {
+ initialConnectors[count++] = entry.getConnector();
+ }
+
+ if (isHA() && isClusterConnection() && !isReceivedTopology() && initialConnectors.length > 0)
+ {
+ // FIXME the node is alone in the cluster. We create a connection to the new node
+ // to trigger the node notification to form the cluster.
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ setInitialConnectors(initialConnectors);
+ }
+}
Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java 2011-02-10 10:25:47 UTC (rev 10196)
@@ -103,6 +103,8 @@
private Integer threadPoolMaxSize;
+ private Map<String, Object> discoveryPluginParameters;
+
/**
* @return the transportType
*/
@@ -122,6 +124,17 @@
hasBeenUpdated = true;
}
+ public Map<String, Object> getParsedDiscoveryPluginParameters()
+ {
+ return discoveryPluginParameters;
+ }
+
+ public void setParsedDiscoveryPluginParameters(final Map<String, Object> discoveryPluginParameters)
+ {
+ this.discoveryPluginParameters = discoveryPluginParameters;
+ hasBeenUpdated = true;
+ }
+
public void setConnectorClassName(final String value)
{
connectorClassName = value;
Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2011-02-10 10:25:47 UTC (rev 10196)
@@ -85,6 +85,11 @@
private String unparsedProperties;
/**
+ * The discovery plugin properties for resource adapter before parsing
+ */
+ private String unparsedDiscoveryPluginProperties;
+
+ /**
* Have the factory been configured
*/
private final AtomicBoolean configured;
@@ -262,6 +267,20 @@
}
}
+ public String getDiscoveryPluginParameters()
+ {
+ return unparsedDiscoveryPluginProperties;
+ }
+
+ public void setDiscoveryPluginProperties(final String config)
+ {
+ if(config != null)
+ {
+ this.unparsedDiscoveryPluginProperties = config;
+ raProperties.setParsedDiscoveryPluginParameters(Util.parseConfig(config));
+ }
+ }
+
/**
* Get the discovery group name
*
@@ -1405,7 +1424,6 @@
}
else if (discoveryAddress != null)
{
- // FIXME make discovery stategy pluggable with configuration
Map<String,Object> params = new HashMap<String,Object>();
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
@@ -1433,6 +1451,25 @@
cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
}
}
+ else if (this.unparsedDiscoveryPluginProperties != null)
+ {
+ // for another discovery strategy
+ Map<String, Object> discoveryPluginParams =
+ overrideConnectionParameters(overrideProperties.getParsedDiscoveryPluginParameters(),raProperties.getParsedDiscoveryPluginParameters());
+
+ String serverLocatorClassName = (String)discoveryPluginParams.get("server-locator-class");
+
+ DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(serverLocatorClassName, discoveryPluginParams, null);
+
+ if (ha)
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
+ }
+ else
+ {
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+ }
+ }
else
{
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for HornetQ ResourceAdapter Connection Factory");
More information about the hornetq-commits
mailing list