[jboss-cvs] JBoss Messaging SVN: r5452 - in trunk: src/main/org/jboss/messaging/core/client/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 2 10:06:47 EST 2008


Author: jmesnil
Date: 2008-12-02 10:06:47 -0500 (Tue, 02 Dec 2008)
New Revision: 5452

Added:
   trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
   trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java
   trunk/src/main/org/jboss/messaging/core/cluster/impl/
   trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
Removed:
   trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
Modified:
   trunk/build-messaging.xml
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
Log:
moved cluster classes used on both client & server sides to o.j.m.core.cluster
added this package to jbm-core-client.jar

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/build-messaging.xml	2008-12-02 15:06:47 UTC (rev 5452)
@@ -65,7 +65,7 @@
    <property name="security.jar.name" value="jbm-jbossas-security.jar"/>
    <property name="bootstrap.jar.name" value="jbm-bootstrap.jar"/>
    <property name="logging.jar.name" value="jbm-logging.jar"/>
-   <property name="core.client..jar.name" value="jbm-core-client.jar"/>
+   <property name="core.client.jar.name" value="jbm-core-client.jar"/>
 
    <!--source and build dirs-->
    <property name="build.dir" value="build"/>
@@ -583,7 +583,7 @@
    </target>
 
    <target name="jar-client" depends="compile-core">
-      <jar jarfile="${build.jars.dir}/${core.client..jar.name}">
+      <jar jarfile="${build.jars.dir}/${core.client.jar.name}">
          <fileset dir="${build.core.classes.dir}">
             <include name="version.properties"/>
             <include name="org/jboss/messaging/core/client/**/*.class"/>
@@ -591,6 +591,7 @@
             <include name="org/jboss/messaging/core/logging/**/*.class"/>
             <include name="org/jboss/messaging/core/remoting/**/*.class"/>
             <include name="org/jboss/messaging/util/**/*.class"/>
+             <include name="org/jboss/messaging/core/cluster/**/*.class"/>
             <include name="org/jboss/messaging/core/config/**/*.class"/>
             <include name="org/jboss/messaging/core/list/**/*.class"/>
             <include name="org/jboss/messaging/core/message/**/*.class"/>
@@ -634,7 +635,7 @@
             <include name="${security.jar.name}"/>
             <include name="${bootstrap.jar.name}"/>
             <include name="${logging.jar.name}"/>
-            <include name="${core.client..jar.name}"/>
+            <include name="${core.client.jar.name}"/>
          </fileset>
          <fileset dir="${jboss.microcontainer.lib}">
             <include name="jboss-container.jar"/>

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -22,13 +22,13 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ConnectionLoadBalancingPolicy;
 import org.jboss.messaging.core.client.ConnectionManager;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.server.cluster.DiscoveryListener;
-import org.jboss.messaging.core.server.cluster.impl.DiscoveryGroupImpl;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
 import org.jboss.messaging.util.Pair;
 
 /**

Copied: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java (from rev 5446, trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.util.Pair;
+
+/**
+ * A DiscoveryGroup
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Nov 2008 09:26:54
+ *
+ *
+ */
+public interface DiscoveryGroup
+{
+   void start() throws Exception;
+   
+   void stop() throws Exception;
+
+   List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors();
+   
+   boolean waitForBroadcast(long timeout);
+   
+   void registerListener(final DiscoveryListener listener);
+   
+   void unregisterListener(final DiscoveryListener listener);
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Copied: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java (from rev 5446, trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.cluster;
+
+/**
+ * A DiscoveryListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 17 Nov 2008 14:30:39
+ *
+ *
+ */
+public interface DiscoveryListener
+{
+   void connectorsChanged();
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Copied: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java (from rev 5446, trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -0,0 +1,297 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.cluster.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.InterruptedIOException;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.cluster.DiscoveryListener;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.util.Pair;
+
+/**
+ * A DiscoveryGroupImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 17 Nov 2008 13:21:45
+ *
+ */
+public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
+{
+   private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
+   
+   private static final int SOCKET_TIMEOUT = 500;
+   
+   private MulticastSocket socket;
+
+   private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+        
+   private final Thread thread;
+   
+   private boolean received;
+   
+   private final Object waitLock = new Object();
+   
+   private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
+   
+   private final long timeout;
+   
+   private volatile boolean started;
+      
+   public DiscoveryGroupImpl(final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
+   {
+      socket = new MulticastSocket(groupPort);
+
+      socket.joinGroup(groupAddress);
+      
+      socket.setSoTimeout(SOCKET_TIMEOUT);
+      
+      this.timeout = timeout;
+      
+      thread = new Thread(this);
+      
+      thread.setDaemon(true);
+   }
+   
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      thread.start();
+      
+      started = true;
+   }
+   
+   public void stop()
+   {
+      synchronized (this)
+      {
+         if (!started)
+         {
+            return;
+         }
+         
+         started = false;
+      }
+      
+      try
+      {
+         thread.join();
+      }
+      catch (InterruptedException e)
+      {        
+      }
+      
+      socket.close();           
+   }
+   
+   public boolean isStarted()
+   {
+      return started;
+   }
+        
+   public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
+   {
+      return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
+   }
+   
+   public boolean waitForBroadcast(final long timeout)
+   {
+      synchronized (waitLock)
+      { 
+         long start = System.currentTimeMillis();
+         
+         long toWait = timeout;
+         
+         while (!received && toWait > 0)
+         {      
+            try
+            {
+               waitLock.wait(toWait);
+            }
+            catch (InterruptedException e)
+            {               
+            }
+            
+            long now = System.currentTimeMillis();
+            
+            toWait -= now - start;
+
+            start = now;                       
+         }
+         
+         boolean ret = received;
+         
+         received = false;
+         
+         return ret;
+      }
+   }
+
+   public void run()
+   {
+      //TODO - can we use a smaller buffer size?
+      final byte[] data = new byte[65535];
+      
+      final DatagramPacket packet = new DatagramPacket(data, data.length);
+      
+      try
+      {      
+         while (true)
+         {
+            if (!started)
+            {
+               return;
+            }
+            
+            try
+            {
+               socket.receive(packet);
+            }
+            catch (InterruptedIOException e)
+            {
+               if (!started)
+               {
+                  return;
+               }
+               else
+               {
+                  continue;
+               }
+            }
+             
+            ByteArrayInputStream bis = new ByteArrayInputStream(data);
+            
+            ObjectInputStream ois = new ObjectInputStream(bis);
+            
+            int size = ois.readInt();
+            
+            boolean changed = false;
+            
+            synchronized (this)
+            {            
+               for (int i = 0; i < size; i++)
+               {
+                  TransportConfiguration connector = (TransportConfiguration)ois.readObject();
+                  
+                  boolean existsBackup = ois.readBoolean();
+                  
+                  TransportConfiguration backupConnector = null;
+                  
+                  if (existsBackup)
+                  {
+                     backupConnector = (TransportConfiguration)ois.readObject();
+                  }
+                  
+                  Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+                     new Pair<TransportConfiguration, TransportConfiguration>(connector, backupConnector);
+                  
+                  Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
+                  
+                  if (oldVal == null)
+                  {
+                     changed = true;
+                  }
+               }
+               
+               long now = System.currentTimeMillis();
+               
+               Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet().iterator();
+               
+               //Weed out any expired connectors
+               
+               while (iter.hasNext())
+               {
+                  Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
+                  
+                  if (entry.getValue() + timeout <= now)
+                  {
+                     iter.remove();
+                     
+                     changed = true;
+                  }
+               }
+            }
+            
+            packet.setLength(data.length);
+            
+            synchronized (waitLock)
+            {
+               received = true;
+               
+               waitLock.notify();
+            }
+            
+            if (changed)
+            {
+               callListeners();
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to receive datagram", e);
+      }
+   }
+   
+   public synchronized void registerListener(final DiscoveryListener listener)
+   {
+      this.listeners.add(listener);
+   }
+   
+   public synchronized void unregisterListener(final DiscoveryListener listener)
+   {
+      this.listeners.remove(listener);
+   }
+   
+   private void callListeners()
+   {      
+      for (DiscoveryListener listener: listeners)
+      {
+         try
+         {
+            listener.connectorsChanged();
+         }
+         catch (Throwable t)
+         {
+            //Catch it so exception doesn't prevent other listeners from running
+            log.error("Failed to call discovery listener", t);
+         }
+      }
+   }
+}


Property changes on: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java	2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -1,50 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-import java.util.List;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.util.Pair;
-
-/**
- * A DiscoveryGroup
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 18 Nov 2008 09:26:54
- *
- *
- */
-public interface DiscoveryGroup extends MessagingComponent
-{
-   List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors();
-   
-   boolean waitForBroadcast(long timeout);
-   
-   void registerListener(final DiscoveryListener listener);
-   
-   void unregisterListener(final DiscoveryListener listener);
-}

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java	2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -1,38 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-/**
- * A DiscoveryListener
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 17 Nov 2008 14:30:39
- *
- *
- */
-public interface DiscoveryListener
-{
-   void connectorsChanged();
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -32,6 +32,8 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
@@ -42,7 +44,6 @@
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
 import org.jboss.messaging.core.server.cluster.ClusterManager;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.server.cluster.MessageFlow;
 import org.jboss.messaging.core.server.cluster.Transformer;
 import org.jboss.messaging.core.settings.HierarchicalRepository;

Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java	2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -1,297 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.InterruptedIOException;
-import java.io.ObjectInputStream;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.server.cluster.DiscoveryListener;
-import org.jboss.messaging.util.Pair;
-
-/**
- * A DiscoveryGroupImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
- * Created 17 Nov 2008 13:21:45
- *
- */
-public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
-{
-   private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
-   
-   private static final int SOCKET_TIMEOUT = 500;
-   
-   private MulticastSocket socket;
-
-   private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
-        
-   private final Thread thread;
-   
-   private boolean received;
-   
-   private final Object waitLock = new Object();
-   
-   private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
-   
-   private final long timeout;
-   
-   private volatile boolean started;
-      
-   public DiscoveryGroupImpl(final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
-   {
-      socket = new MulticastSocket(groupPort);
-
-      socket.joinGroup(groupAddress);
-      
-      socket.setSoTimeout(SOCKET_TIMEOUT);
-      
-      this.timeout = timeout;
-      
-      thread = new Thread(this);
-      
-      thread.setDaemon(true);
-   }
-   
-   public synchronized void start() throws Exception
-   {
-      if (started)
-      {
-         return;
-      }
-      
-      thread.start();
-      
-      started = true;
-   }
-   
-   public void stop()
-   {
-      synchronized (this)
-      {
-         if (!started)
-         {
-            return;
-         }
-         
-         started = false;
-      }
-      
-      try
-      {
-         thread.join();
-      }
-      catch (InterruptedException e)
-      {        
-      }
-      
-      socket.close();           
-   }
-   
-   public boolean isStarted()
-   {
-      return started;
-   }
-        
-   public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
-   {
-      return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
-   }
-   
-   public boolean waitForBroadcast(final long timeout)
-   {
-      synchronized (waitLock)
-      { 
-         long start = System.currentTimeMillis();
-         
-         long toWait = timeout;
-         
-         while (!received && toWait > 0)
-         {      
-            try
-            {
-               waitLock.wait(toWait);
-            }
-            catch (InterruptedException e)
-            {               
-            }
-            
-            long now = System.currentTimeMillis();
-            
-            toWait -= now - start;
-
-            start = now;                       
-         }
-         
-         boolean ret = received;
-         
-         received = false;
-         
-         return ret;
-      }
-   }
-
-   public void run()
-   {
-      //TODO - can we use a smaller buffer size?
-      final byte[] data = new byte[65535];
-      
-      final DatagramPacket packet = new DatagramPacket(data, data.length);
-      
-      try
-      {      
-         while (true)
-         {
-            if (!started)
-            {
-               return;
-            }
-            
-            try
-            {
-               socket.receive(packet);
-            }
-            catch (InterruptedIOException e)
-            {
-               if (!started)
-               {
-                  return;
-               }
-               else
-               {
-                  continue;
-               }
-            }
-             
-            ByteArrayInputStream bis = new ByteArrayInputStream(data);
-            
-            ObjectInputStream ois = new ObjectInputStream(bis);
-            
-            int size = ois.readInt();
-            
-            boolean changed = false;
-            
-            synchronized (this)
-            {            
-               for (int i = 0; i < size; i++)
-               {
-                  TransportConfiguration connector = (TransportConfiguration)ois.readObject();
-                  
-                  boolean existsBackup = ois.readBoolean();
-                  
-                  TransportConfiguration backupConnector = null;
-                  
-                  if (existsBackup)
-                  {
-                     backupConnector = (TransportConfiguration)ois.readObject();
-                  }
-                  
-                  Pair<TransportConfiguration, TransportConfiguration> connectorPair =
-                     new Pair<TransportConfiguration, TransportConfiguration>(connector, backupConnector);
-                  
-                  Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
-                  
-                  if (oldVal == null)
-                  {
-                     changed = true;
-                  }
-               }
-               
-               long now = System.currentTimeMillis();
-               
-               Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet().iterator();
-               
-               //Weed out any expired connectors
-               
-               while (iter.hasNext())
-               {
-                  Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
-                  
-                  if (entry.getValue() + timeout <= now)
-                  {
-                     iter.remove();
-                     
-                     changed = true;
-                  }
-               }
-            }
-            
-            packet.setLength(data.length);
-            
-            synchronized (waitLock)
-            {
-               received = true;
-               
-               waitLock.notify();
-            }
-            
-            if (changed)
-            {
-               callListeners();
-            }
-         }
-      }
-      catch (Exception e)
-      {
-         log.error("Failed to receive datagram", e);
-      }
-   }
-   
-   public synchronized void registerListener(final DiscoveryListener listener)
-   {
-      this.listeners.add(listener);
-   }
-   
-   public synchronized void unregisterListener(final DiscoveryListener listener)
-   {
-      this.listeners.remove(listener);
-   }
-   
-   private void callListeners()
-   {      
-      for (DiscoveryListener listener: listeners)
-      {
-         try
-         {
-            listener.connectorsChanged();
-         }
-         catch (Throwable t)
-         {
-            //Catch it so exception doesn't prevent other listeners from running
-            log.error("Failed to call discovery listener", t);
-         }
-      }
-   }
-}

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
@@ -30,6 +30,8 @@
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -37,8 +39,6 @@
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.server.cluster.DiscoveryListener;
 import org.jboss.messaging.core.server.cluster.Forwarder;
 import org.jboss.messaging.core.server.cluster.MessageFlow;
 import org.jboss.messaging.core.server.cluster.Transformer;




More information about the jboss-cvs-commits mailing list