[hornetq-commits] JBoss hornetq SVN: r10196 - in branches/HORNETQ-316: src/main/org/hornetq/integration and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Feb 10 05:25:47 EST 2011


Author: igarashitm
Date: 2011-02-10 05:25:47 -0500 (Thu, 10 Feb 2011)
New Revision: 10196

Added:
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
   branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
Modified:
   branches/HORNETQ-316/build-hornetq.xml
   branches/HORNETQ-316/pom.xml
   branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
   branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
Log:
added jgroups discovery plugin


Modified: branches/HORNETQ-316/build-hornetq.xml
===================================================================
--- branches/HORNETQ-316/build-hornetq.xml	2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/build-hornetq.xml	2011-02-10 10:25:47 UTC (rev 10196)
@@ -69,6 +69,8 @@
    <property name="spring.integration.sources.jar.name" value="hornetq-spring-integration-sources.jar"/>
    <property name="twitter.integration.jar.name" value="hornetq-twitter-integration.jar"/>
    <property name="twitter.integration.sources.jar.name" value="hornetq-twitter-integration-sources.jar"/>
+   <property name="jgroups.discovery.jar.name" value="hornetq-jgroups-discovery.jar"/>
+   <property name="jgroups.discovery.sources.jar.name" value="hornetq-jgroups-discovery.jar"/>
    <property name="bootstrap.jar.name" value="hornetq-bootstrap.jar"/>
    <property name="bootstrap.sources.jar.name" value="hornetq-bootstrap-sources.jar"/>
    <property name="logging.jar.name" value="hornetq-logging.jar"/>
@@ -87,6 +89,7 @@
    <property name="resources.jar.name" value="hornetq-resources.jar"/>
    <property name="resources.sources.jar.name" value="hornetq-resources-sources.jar"/>
    <property name="twitter4j.jar.name" value="twitter4j-core.jar"/>
+   <property name="jgroups.jar.name" value="jgroups.jar"/>
    <property name="eap.examples.zip.name" value="hornetq-eap-examples.zip"/>
 
    <!--source and build dirs-->
@@ -99,6 +102,7 @@
    <property name="build.jboss.integration.classes.dir" value="${build.dir}/classes/jboss-integration"/>
    <property name="build.spring.integration.classes.dir" value="${build.dir}/classes/spring-integration"/>
    <property name="build.twitter.integration.classes.dir" value="${build.dir}/classes/twitter-integration"/>
+   <property name="build.jgroups.discovery.classes.dir" value="${build.dir}/classes/jgroups-discovery"/>
    <property name="build.service.classes.dir" value="${build.dir}/classes/service"/>
    <property name="build.bootstrap.classes.dir" value="${build.dir}/classes/bootstrap"/>
    <property name="build.logging.classes.dir" value="${build.dir}/classes/logging"/>
@@ -223,6 +227,11 @@
       <path refid="org.twitter4j.classpath"/>
    </path>
 
+   <path id="jgroups.discovery.compilation.classpath">
+      <path location="${build.core.classes.dir}"/>
+      <path refid="jgroups.jgroups.classpath"/>
+   </path>
+
    <path id="spring.integration.compilation.classpath">
       <path location="${build.core.classes.dir}"/>
       <path location="${build.jms.classes.dir}"/>
@@ -259,6 +268,7 @@
       <path refid="bootstrap.compilation.classpath"/>
       <path refid="junit.junit.classpath"/>
       <path refid="org.twitter4j.classpath"/>
+      <path refid="jgroups.jgroups.classpath"/>
       <path refid="org.springframework.classpath"/>
       <path location="${build.jars.dir}/${ra.jar.name}"/>
       <path location="${build.jars.dir}/${jms.jar.name}"/>
@@ -267,6 +277,7 @@
       <path location="${build.jars.dir}/${logging.jar.name}"/>
       <path location="${build.jars.dir}/${spring.integration.jar.name}"/>
       <path location="${build.jars.dir}/${twitter.integration.jar.name}"/>
+      <path location="${build.jars.dir}/${jgroups.discovery.jar.name}"/>
    </path>
 
    <path id="jms.test.compilation.classpath">
@@ -321,6 +332,7 @@
  	  <path refid="apache.logging.classpath"/>
       <path refid="org.springframework.classpath"/>
       <path refid="org.twitter4j.classpath"/>
+      <path refid="jgroups.jgroups.classpath"/>
    </path>
 
    <path id="emma.unit.test.execution.classpath">
@@ -403,6 +415,7 @@
       <mkdir dir="${build.jboss.integration.classes.dir}"/>
       <mkdir dir="${build.spring.integration.classes.dir}"/>
       <mkdir dir="${build.twitter.integration.classes.dir}"/>
+      <mkdir dir="${build.jgroups.discovery.classes.dir}"/>
       <mkdir dir="${build.service.classes.dir}"/>
       <mkdir dir="${build.bootstrap.classes.dir}"/>
       <mkdir dir="${build.logging.classes.dir}"/>
@@ -606,6 +619,27 @@
       </javac>
    </target>
 
+   <target name="compile-jgroups-discovery" depends="compile-core">
+      <javac destdir="${build.jgroups.discovery.classes.dir}"
+             target="${javac.target}"
+             source="${javac.source}"
+             optimize="${javac.optimize}"
+             debug="${javac.debug}"
+             depend="${javac.depend}"
+             verbose="${javac.verbose}"
+             deprecation="${javac.deprecation}"
+             includeAntRuntime="${javac.include.ant.runtime}"
+             includeJavaRuntime="${javac.include.java.runtime}"
+             encoding="${javac.encoding}"
+             failonerror="${javac.fail.onerror}">
+         <src>
+            <pathelement path="${src.main.dir}"/>
+         </src>
+         <include name="org/hornetq/integration/discovery/jgroups/**/*.java"/>
+         <classpath refid="jgroups.discovery.compilation.classpath"/>
+      </javac>
+   </target>
+
    <target name="compile-spring-integration" depends="compile-core">
       <javac destdir="${build.spring.integration.classes.dir}"
              target="${javac.target}"
@@ -771,11 +805,11 @@
    <!-- ======================================================================================== -->
 
    <target name="sources-jar" description="create jar files containing source code"
-           depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources, jar-spring-integration-sources">
+           depends="jar-core-sources, jar-core-client-sources, jar-core-client-java5-sources, jar-jms-sources, jar-jms-client-sources, jar-jms-client-java5-sources, jar-jboss-integration-sources, jar-jboss-service-sources, jar-bootstrap-sources, jar-logging-sources, jar-ra-sources, jar-resources-sources, jar-twitter-integration-sources, jar-jgroups-discovery-sources, jar-spring-integration-sources">
    </target>
 
    <target name="jar"
-           depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration, jar-spring-integration, jar-rest">
+           depends="jar-core, jar-core-client, jar-core-client-java5, jar-jms, jar-jms-client, jar-jms-client-java5, jar-jboss-integration, jar-jboss-service, jar-bootstrap, jar-logging, jar-ra, jar-mc, jar-jnp-client, jar-resources, sources-jar, jar-twitter-integration, jar-jgroups-discovery, jar-spring-integration, jar-rest">
    </target>
 
    <target name="jar-jnp-client" depends="init">
@@ -935,6 +969,24 @@
       </jar>
    </target>
 
+    <target name="jar-jgroups-discovery" depends="compile-jgroups-discovery">
+
+      <jar jarfile="${build.jars.dir}/${jgroups.discovery.jar.name}">
+         <fileset dir="${build.jgroups.discovery.classes.dir}" includes="**"/>
+      </jar>
+
+   </target>
+
+    <target name="jar-jgroups-discovery-sources">
+    
+      <jar jarfile="${build.jars.dir}/${jgroups.discovery.sources.jar.name}">
+         <fileset dir="${src.main.dir}">
+            <include name="org/hornetq/integration/discovery/jgroups/**/*.java"/>
+         </fileset>
+      </jar>
+
+   </target>
+
     <target name="jar-spring-integration" depends="compile-spring-integration">
 
       <jar jarfile="${build.jars.dir}/${spring.integration.jar.name}">
@@ -1267,6 +1319,7 @@
          	<include name="${jnp.client.jar.name}"/>
          	<include name="${spring.integration.jar.name}"/>
          	<include name="${twitter.integration.jar.name}"/>
+         	<include name="${jgroups.discovery.jar.name}"/>
          </fileset>
          <fileset dir="${org.jboss.naming.lib}">                                                   
             <include name="jnpserver.jar"/>
@@ -1277,6 +1330,7 @@
       </copy>
       <copy file="${org.jboss.netty.lib}/${netty.jar.name}" tofile="${build.distro.lib.dir}/netty.jar"/>
       <copy file="${org.twitter4j.lib}/${twitter4j.jar.name}" tofile="${build.distro.lib.dir}/${twitter4j.jar.name}"/>
+      <copy file="${jgroups.jgroups.lib}/${jgroups.jar.name}" tofile="${build.distro.lib.dir}/${jgroups.jar.name}"/>
       <copy todir="${build.distro.config.dir}">
          <fileset dir="${src.config.dir}">
             <include name="*.xml"/>

Modified: branches/HORNETQ-316/pom.xml
===================================================================
--- branches/HORNETQ-316/pom.xml	2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/pom.xml	2011-02-10 10:25:47 UTC (rev 10196)
@@ -255,6 +255,12 @@
            <artifactId>twitter4j-core</artifactId>
            <version>2.1.6</version>
        </dependency>
+      <!-- needed to compile jgroups discovery support-->
+      <dependency>
+         <groupId>jgroups</groupId>
+         <artifactId>jgroups</artifactId>
+         <version>2.3</version>
+      </dependency>
       <!-- needed to compile the tests-->
       <dependency>
         <groupId>junit</groupId>

Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java	2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+/**
+ * A BroadcastGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class BroadcastGroupConstants
+{
+   public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-file";
+   public static final Object BROADCAST_PERIOD_NAME = "broadcast-period";
+}


Property changes on: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/BroadcastGroupConstants.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/DiscoveryGroupConstants.java	2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+/**
+ * A DiscoveryGroupConstants
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class DiscoveryGroupConstants
+{
+   public static final String JGROUPS_CONFIGURATION_FILE_NAME = "jgroups-configuration-filename";
+   public static final String INITIAL_WAIT_TIMEOUT_NAME = "initial-wait-timeout";
+   public static final String REFRESH_TIMEOUT_NAME = "refresh-timeout";
+}

Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java	2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.config.BroadcastGroupConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.ConfigurationHelper;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+
+/**
+ * A JGroupsBroadcastGroupImpl
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsBroadcastGroupImpl implements BroadcastGroup, Runnable
+{
+   private static final Logger log = Logger.getLogger(JGroupsBroadcastGroupImpl.class);
+
+   private final String nodeID;
+
+   private final String name;
+
+   private final BroadcastGroupConfiguration broadcastGroupConfiguration;
+   
+   private final List<TransportConfiguration> connectors = new ArrayList<TransportConfiguration>();
+
+   private String jgroupsConfigurationFileName;
+   
+   private JChannel broadcastChannel;
+   
+   private boolean started;
+
+   private ScheduledFuture<?> future;
+
+   private boolean active;
+
+   // Each broadcast group has a unique id - we use this to detect when more than one group broadcasts the same node id
+   // on the network which would be an error
+   private final String uniqueID;
+
+   private NotificationService notificationService;
+
+   public JGroupsBroadcastGroupImpl(final String nodeID,
+                                    final String name,
+                                    final boolean active,
+                                    final BroadcastGroupConfiguration config)
+   {
+      this.nodeID = nodeID;
+
+      this.name = name;
+
+      this.active = active;
+
+      this.broadcastGroupConfiguration = config;
+      
+      uniqueID = UUIDGenerator.getInstance().generateStringUUID();
+   }
+   
+   public void setNotificationService(NotificationService notificationService)
+   {
+      this.notificationService = notificationService;
+   }
+
+   public void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      Map<String,Object> params = this.broadcastGroupConfiguration.getParams();
+      this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+
+      this.broadcastChannel = new JChannel(Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName));
+      
+      this.broadcastChannel.connect(this.name);
+      
+      started = true;
+
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+         Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STARTED, props);
+         notificationService.sendNotification(notification);
+      }
+   }
+
+   public void stop() throws Exception
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      if (future != null)
+      {
+         future.cancel(false);
+      }
+
+      started = false;
+
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+         Notification notification = new Notification(nodeID, NotificationType.BROADCAST_GROUP_STOPPED, props);
+         try
+         {
+            notificationService.sendNotification(notification);
+         }
+         catch (Exception e)
+         {
+            JGroupsBroadcastGroupImpl.log.warn("unable to send notification when broadcast group is stopped", e);
+         }
+      }
+
+   }
+
+   public boolean isStarted()
+   {
+      return this.started;
+   }
+
+   public String getName()
+   {
+      return this.name;
+   }
+
+   public void addConnector(TransportConfiguration tcConfig)
+   {
+      this.connectors.add(tcConfig);
+   }
+
+   public void removeConnector(TransportConfiguration tcConfig)
+   {
+      this.connectors.remove(tcConfig);
+   }
+
+   public int size()
+   {
+      return this.connectors.size();
+   }
+
+   public void activate()
+   {
+      this.active = true;
+   }
+
+   public void broadcastConnectors() throws Exception
+   {
+      if (!active)
+      {
+         return;
+      }
+
+      HornetQBuffer buff = HornetQBuffers.dynamicBuffer(4096);
+
+      buff.writeString(nodeID);
+
+      buff.writeString(uniqueID);
+
+      buff.writeInt(connectors.size());
+
+      for (TransportConfiguration tcConfig : connectors)
+      {
+         tcConfig.encode(buff);
+      }
+
+      byte[] data = buff.toByteBuffer().array();
+
+      Message msg = new Message();
+
+      msg.setBuffer(data);
+
+      this.broadcastChannel.send(msg);
+   }
+
+   public void run()
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      try
+      {
+         broadcastConnectors();
+      }
+      catch (Exception e)
+      {
+         JGroupsBroadcastGroupImpl.log.error("Failed to broadcast connector configs", e);
+      }
+   }
+
+   public void schedule(ScheduledExecutorService scheduler)
+   {
+      Map<String,Object> params = broadcastGroupConfiguration.getParams();
+      
+      this.future = scheduler.scheduleWithFixedDelay(this,
+                                                     0L,
+                                                     Long.parseLong((String)params.get(BroadcastGroupConstants.BROADCAST_PERIOD_NAME)),
+                                                     TimeUnit.MILLISECONDS);
+   }
+}


Property changes on: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsBroadcastGroupImpl.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java	2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,358 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.utils.TypedProperties;
+import org.jgroups.JChannel;
+import org.jgroups.Message;
+import org.jgroups.ReceiverAdapter;
+
+/**
+ * A JGroupsDiscoveryGroupImpl
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsDiscoveryGroupImpl extends ReceiverAdapter implements DiscoveryGroup
+{
+   private static final Logger log = Logger.getLogger(JGroupsDiscoveryGroupImpl.class);
+
+   private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+   private final String name;
+   
+   private final URL configURL;
+   
+   private final String nodeID;
+
+   private volatile boolean started;
+
+   private boolean received;
+
+   private final Object waitLock = new Object();
+
+   private final Map<String, DiscoveryEntry> connectors = new HashMap<String, DiscoveryEntry>();
+
+   private final long timeout;
+
+   private final Map<String, String> uniqueIDMap = new HashMap<String, String>();
+
+   private JChannel discoveryChannel;
+   
+   private NotificationService notificationService;
+   
+   public JGroupsDiscoveryGroupImpl(final String nodeID,
+                                    final String name,
+                                    final URL confURL,
+                                    final long timeout)
+   {
+      this.nodeID = nodeID;
+      this.name = name;
+      this.configURL = confURL;
+      this.timeout = timeout;
+   }
+   
+   public void setNotificationService(NotificationService notificationService)
+   {
+      this.notificationService = notificationService;
+   }
+
+   public void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+
+      try
+      {
+         this.discoveryChannel = new JChannel(configURL);
+
+         this.discoveryChannel.setReceiver(this);
+         
+         this.discoveryChannel.connect(this.name);
+      }
+      catch(Exception e)
+      {
+         log.error("Failed to join jgroups channel", e);
+         return;
+      }
+      
+      started = true;
+
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+
+         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+
+         Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STARTED, props);
+
+         notificationService.sendNotification(notification);
+      }
+   }
+
+   public void stop() throws Exception
+   {
+      synchronized (this)
+      {
+         if (!started)
+         {
+            return;
+         }
+
+         started = false;
+      }
+
+      synchronized (waitLock)
+      {
+         waitLock.notify();
+      }
+
+      this.discoveryChannel.shutdown();
+
+      this.discoveryChannel = null;
+
+      if (notificationService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));
+         Notification notification = new Notification(nodeID, NotificationType.DISCOVERY_GROUP_STOPPED, props);
+         try
+         {
+            notificationService.sendNotification(notification);
+         }
+         catch (Exception e)
+         {
+            JGroupsDiscoveryGroupImpl.log.warn("unable to send notification when discovery group is stopped", e);
+         }
+      }
+   }
+
+   public String getName()
+   {
+      return this.name;
+   }
+
+   public List<DiscoveryEntry> getDiscoveryEntries()
+   {
+      List<DiscoveryEntry> list = new ArrayList<DiscoveryEntry>();
+      
+      list.addAll(connectors.values());
+      
+      return list;
+   }
+
+   public boolean isStarted()
+   {
+      return this.started;
+   }
+
+   public boolean waitForBroadcast(long timeout)
+   {
+      synchronized (waitLock)
+      {
+         long start = System.currentTimeMillis();
+
+         long toWait = timeout;
+
+         while (started && !received && (toWait > 0 || timeout == 0))
+         {
+            try
+            {
+               waitLock.wait(toWait);
+            }
+            catch (InterruptedException e)
+            {
+            }
+
+            if (timeout != 0)
+            {
+               long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+         }
+
+         boolean ret = received;
+
+         received = false;
+
+         return ret;
+      }
+   }
+
+   @Override
+   public void receive(Message msg)
+   {
+      if(!started)
+      {
+         return;
+      }
+
+      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(msg.getBuffer());
+
+      String originatingNodeID = buffer.readString();
+
+      String uniqueID = buffer.readString();
+
+      checkUniqueID(originatingNodeID, uniqueID);
+
+      if (nodeID.equals(originatingNodeID))
+      {
+         if (checkExpiration())
+         {
+            callListeners();
+         }
+         
+         // Ignore traffic from own node
+         return;
+      }
+
+      int size = buffer.readInt();
+
+      boolean changed = false;
+
+      synchronized (this)
+      {
+         for (int i = 0; i < size; i++)
+         {
+            TransportConfiguration connector = new TransportConfiguration();
+
+            connector.decode(buffer);
+           
+            DiscoveryEntry entry = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
+
+            DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
+
+            if (oldVal == null)
+            {
+               changed = true;
+            }
+         }
+
+         changed = changed || checkExpiration();
+      }
+
+      if (changed)
+      {
+         callListeners();
+      }
+
+      synchronized (waitLock)
+      {
+         received = true;
+
+         waitLock.notify();
+      }
+   }
+   
+   public void registerListener(DiscoveryListener listener)
+   {
+      listeners.add(listener);
+
+      if (!connectors.isEmpty())
+      {
+         listener.connectorsChanged();
+      }
+   }
+
+   public void unregisterListener(DiscoveryListener listener)
+   {
+      listeners.remove(listener);
+   }
+
+   private void callListeners()
+   {
+      for (DiscoveryListener listener : listeners)
+      {
+         try
+         {
+            listener.connectorsChanged();
+         }
+         catch (Throwable t)
+         {
+            // Catch it so exception doesn't prevent other listeners from running
+            JGroupsDiscoveryGroupImpl.log.error("Failed to call discovery listener", t);
+         }
+      }
+   }
+   
+   private void checkUniqueID(final String originatingNodeID, final String uniqueID)
+   {
+      String currentUniqueID = uniqueIDMap.get(originatingNodeID);
+
+      if (currentUniqueID == null)
+      {
+         uniqueIDMap.put(originatingNodeID, uniqueID);
+      }
+      else
+      {
+         if (!currentUniqueID.equals(uniqueID))
+         {
+            log.warn("There are more than one servers on the network broadcasting the same node id. " + "You will see this message exactly once (per node) if a node is restarted, in which case it can be safely "
+                     + "ignored. But if it is logged continuously it means you really do have more than one node on the same network "
+                     + "active concurrently with the same node id. This could occur if you have a backup node active at the same time as "
+                     + "its live node. nodeID=" + originatingNodeID);
+            uniqueIDMap.put(originatingNodeID, uniqueID);
+         }
+      }
+   }
+
+   private boolean checkExpiration()
+   {
+      boolean changed = false;
+      long now = System.currentTimeMillis();
+
+      Iterator<Map.Entry<String, DiscoveryEntry>> iter = connectors.entrySet().iterator();
+
+      // Weed out any expired connectors
+
+      while (iter.hasNext())
+      {
+         Map.Entry<String, DiscoveryEntry> entry = iter.next();
+
+         if (entry.getValue().getLastUpdate() + timeout <= now)
+         {
+            iter.remove();
+
+            changed = true;
+         }
+      }
+      
+      return changed;
+   }
+
+}


Property changes on: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsDiscoveryGroupImpl.java
___________________________________________________________________
Name: svn:mime-type
   + text/plain

Added: branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java	                        (rev 0)
+++ branches/HORNETQ-316/src/main/org/hornetq/integration/discovery/jgroups/JGroupsServerLocatorImpl.java	2011-02-10 10:25:47 UTC (rev 10196)
@@ -0,0 +1,333 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.discovery.jgroups;
+
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import org.hornetq.api.core.DiscoveryGroupConfiguration;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.client.impl.AbstractServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.cluster.DiscoveryEntry;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.cluster.DiscoveryListener;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.ConfigurationHelper;
+
+/**
+ * A JGroupsServerLocatorImpl
+ *
+ * @author "<a href=\"tm.igarashi at gmail.com\">Tomohisa Igarashi</a>"
+ *
+ *
+ */
+public class JGroupsServerLocatorImpl extends AbstractServerLocator implements DiscoveryListener
+{
+   private static final long serialVersionUID = 1720602999991968346L;
+
+   private static final Logger log = Logger.getLogger(JGroupsServerLocatorImpl.class);
+
+   private String discoveryGroupName;
+   
+   private String jgroupsConfigurationFileName;
+
+   private long initialWaitTimeout;
+   
+   private long refreshTimeout;
+   
+   private DiscoveryGroup discoveryGroup;
+   
+   private volatile boolean closing;
+   
+   private synchronized void initialise() throws Exception {
+      if (!isReadOnly())
+      {
+         setThreadPools();
+
+         instantiateLoadBalancingPolicy();
+
+         this.discoveryGroupName = getDiscoveryGroupConfiguration().getName();
+         
+         Map<String,Object> params = getDiscoveryGroupConfiguration().getParams();
+
+         this.initialWaitTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.INITIAL_WAIT_TIMEOUT_NAME, HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, params);
+
+         this.refreshTimeout = ConfigurationHelper.getLongProperty(DiscoveryGroupConstants.REFRESH_TIMEOUT_NAME, ConfigurationImpl.DEFAULT_BROADCAST_REFRESH_TIMEOUT, params);
+
+         this.jgroupsConfigurationFileName = ConfigurationHelper.getStringProperty(DiscoveryGroupConstants.JGROUPS_CONFIGURATION_FILE_NAME, null, params);
+         
+         this.discoveryGroup = new JGroupsDiscoveryGroupImpl(getNodeID(),
+                                                             this.discoveryGroupName,
+                                                             Thread.currentThread().getContextClassLoader().getResource(this.jgroupsConfigurationFileName),
+                                                             this.refreshTimeout);
+         
+         this.discoveryGroup.registerListener(this);
+         
+         this.discoveryGroup.start();
+
+         setReadOnly(true);
+      }
+   }
+   
+   public JGroupsServerLocatorImpl(boolean useHA, DiscoveryGroupConfiguration discoveryGroupConfiguration)
+   {
+      super(useHA, discoveryGroupConfiguration);
+   }
+
+    public void start(Executor executor) throws Exception
+   {
+       initialise();
+
+       executor.execute(new Runnable()
+       {
+          public void run()
+          {
+             try
+             {
+                connect();
+             }
+             catch (Exception e)
+             {
+                if (!closing)
+                {
+                   log.warn("did not connect the cluster connection to other nodes", e);
+                }
+             }
+          }
+       });
+   }
+
+   public ClientSessionFactory connect() throws Exception
+   {
+      ClientSessionFactoryInternal sf;
+
+      // wait for discovery group to get the list of initial connectors
+      sf = (ClientSessionFactoryInternal)createSessionFactory();
+
+      addFactory(sf);
+      return sf;
+   }
+
+   public ClientSessionFactory createSessionFactory() throws Exception
+   {
+      if (isClosed())
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      if (getInitialConnectors() == null)
+      {
+         // Wait for an initial broadcast to give us at least one node in the cluster
+         long timeout = isClusterConnection() ? 0 : this.initialWaitTimeout;
+         boolean ok = discoveryGroup.waitForBroadcast(timeout);
+
+         if (!ok)
+         {
+            throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                       "Timed out waiting to receive initial broadcast from cluster");
+         }
+      }
+
+      ClientSessionFactoryInternal factory = null;
+
+      synchronized (this)
+      {
+         boolean retry;
+         int attempts = 0;
+         do
+         {
+            retry = false;
+
+            TransportConfiguration tc = selectConnector();
+
+            // try each factory in the list until we find one which works
+
+            try
+            {
+               factory = new ClientSessionFactoryImpl(this,
+                                                      tc,
+                                                      getCallTimeout(),
+                                                      getClientFailureCheckPeriod(),
+                                                      getConnectionTTL(),
+                                                      getRetryInterval(),
+                                                      getRetryIntervalMultiplier(),
+                                                      getMaxRetryInterval(),
+                                                      getReconnectAttempts(),
+                                                      getThreadPool(),
+                                                      getScheduledThreadPool(),
+                                                      getInterceptors());
+               factory.connect(getInitialConnectAttempts(), isFailoverOnInitialConnection());
+            }
+            catch (HornetQException e)
+            {
+               factory.close();
+               factory = null;
+               if (e.getCode() == HornetQException.NOT_CONNECTED)
+               {
+                  attempts++;
+
+                  if (attempts == getConnectorLength())
+                  {
+                     throw new HornetQException(HornetQException.NOT_CONNECTED,
+                                                "Cannot connect to server(s). Tried with all available servers.");
+                  }
+                  retry = true;
+               }
+               else
+               {
+                  throw e;
+               }
+            }
+         }
+         while (retry);
+
+         if (isHA())
+         {
+            long toWait = 30000;
+            long start = System.currentTimeMillis();
+            while (!isReceivedTopology() && toWait > 0)
+            {
+               // Now wait for the topology
+
+               try
+               {
+                  wait(toWait);
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+
+               long now = System.currentTimeMillis();
+
+               toWait -= now - start;
+
+               start = now;
+            }
+
+            if (toWait <= 0)
+            {
+               throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
+                                          "Timed out waiting to receive cluster topology");
+            }
+         }
+
+         addFactory(factory);
+
+         return factory;
+      }
+   }
+
+   public ClientSessionFactory createSessionFactory(TransportConfiguration transportConfiguration) throws Exception
+   {
+      if (isClosed())
+      {
+         throw new IllegalStateException("Cannot create session factory, server locator is closed (maybe it has been garbage collected)");
+      }
+
+      try
+      {
+         initialise();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+      }
+
+      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
+                                                                          transportConfiguration,
+                                                                          getCallTimeout(),
+                                                                          getClientFailureCheckPeriod(),
+                                                                          getConnectionTTL(),
+                                                                          getRetryInterval(),
+                                                                          getRetryIntervalMultiplier(),
+                                                                          getMaxRetryInterval(),
+                                                                          getReconnectAttempts(),
+                                                                          getThreadPool(),
+                                                                          getScheduledThreadPool(),
+                                                                          getInterceptors());
+
+      factory.connect(getReconnectAttempts(), isFailoverOnInitialConnection());
+
+      addFactory(factory);
+
+      return factory;
+   }
+
+   public void close()
+   {
+      if (isClosed())
+      {
+         return;
+      }
+
+      closing = true;
+
+      try
+      {
+         this.discoveryGroup.stop();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to stop discovery group", e);
+      }
+
+      super.close();
+   }
+   
+   public synchronized void connectorsChanged()
+   {
+      List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
+
+      TransportConfiguration[] initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+                                                                                               newConnectors.size());
+      int count = 0;
+      for (DiscoveryEntry entry : newConnectors)
+      {
+         initialConnectors[count++] = entry.getConnector();
+      }
+
+      if (isHA() && isClusterConnection() && !isReceivedTopology() && initialConnectors.length > 0)
+      {
+         // FIXME the node is alone in the cluster. We create a connection to the new node
+         // to trigger the node notification to form the cluster.
+         try
+         {
+            connect();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace(); // To change body of catch statement use File | Settings | File Templates.
+         }
+      }
+
+      setInitialConnectors(initialConnectors);
+   }
+}

Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java	2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/ConnectionFactoryProperties.java	2011-02-10 10:25:47 UTC (rev 10196)
@@ -103,6 +103,8 @@
 
    private Integer threadPoolMaxSize;
 
+   private Map<String, Object> discoveryPluginParameters;
+
    /**
     * @return the transportType
     */
@@ -122,6 +124,17 @@
       hasBeenUpdated = true;
    }
 
+   public Map<String, Object> getParsedDiscoveryPluginParameters()
+   {
+      return discoveryPluginParameters;
+   }
+
+   public void setParsedDiscoveryPluginParameters(final Map<String, Object> discoveryPluginParameters)
+   {
+      this.discoveryPluginParameters = discoveryPluginParameters;
+      hasBeenUpdated = true;
+   }
+
    public void setConnectorClassName(final String value)
    {
       connectorClassName = value;

Modified: branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2011-02-10 02:23:25 UTC (rev 10195)
+++ branches/HORNETQ-316/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2011-02-10 10:25:47 UTC (rev 10196)
@@ -85,6 +85,11 @@
    private String unparsedProperties;
 
    /**
+    * The discovery plugin properties for resource adapter before parsing
+    */
+   private String unparsedDiscoveryPluginProperties;
+
+   /**
     * Have the factory been configured
     */
    private final AtomicBoolean configured;
@@ -262,6 +267,20 @@
       }
    }
 
+   public String getDiscoveryPluginParameters()
+   {
+      return unparsedDiscoveryPluginProperties;
+   }
+   
+   public void setDiscoveryPluginProperties(final String config)
+   {
+      if(config != null)
+      {
+         this.unparsedDiscoveryPluginProperties = config;
+         raProperties.setParsedDiscoveryPluginParameters(Util.parseConfig(config));
+      }
+   }
+   
    /**
     * Get the discovery group name
     *
@@ -1405,7 +1424,6 @@
       }
       else if (discoveryAddress != null)
       {
-         // FIXME make discovery stategy pluggable with configuration
          Map<String,Object> params = new HashMap<String,Object>();
          
          Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
@@ -1433,6 +1451,25 @@
             cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
          }
       }
+      else if (this.unparsedDiscoveryPluginProperties != null)
+      {
+         // for another discovery strategy
+         Map<String, Object> discoveryPluginParams =
+            overrideConnectionParameters(overrideProperties.getParsedDiscoveryPluginParameters(),raProperties.getParsedDiscoveryPluginParameters());
+         
+         String serverLocatorClassName = (String)discoveryPluginParams.get("server-locator-class");
+         
+         DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration(serverLocatorClassName, discoveryPluginParams, null);
+         
+         if (ha)
+         {
+            cf = HornetQJMSClient.createConnectionFactoryWithHA(groupConfiguration, JMSFactoryType.XA_CF);
+         }
+         else
+         {
+            cf = HornetQJMSClient.createConnectionFactoryWithoutHA(groupConfiguration, JMSFactoryType.XA_CF);
+         }
+      }
       else
       {
          throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for HornetQ ResourceAdapter Connection Factory");



More information about the hornetq-commits mailing list