[jboss-cvs] JBoss Messaging SVN: r5452 - in trunk: src/main/org/jboss/messaging/core/client/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 2 10:06:47 EST 2008
Author: jmesnil
Date: 2008-12-02 10:06:47 -0500 (Tue, 02 Dec 2008)
New Revision: 5452
Added:
trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java
trunk/src/main/org/jboss/messaging/core/cluster/impl/
trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
Removed:
trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
Modified:
trunk/build-messaging.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
Log:
moved cluster classes used on both client & server sides to o.j.m.core.cluster
added this package to jbm-core-client.jar
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/build-messaging.xml 2008-12-02 15:06:47 UTC (rev 5452)
@@ -65,7 +65,7 @@
<property name="security.jar.name" value="jbm-jbossas-security.jar"/>
<property name="bootstrap.jar.name" value="jbm-bootstrap.jar"/>
<property name="logging.jar.name" value="jbm-logging.jar"/>
- <property name="core.client..jar.name" value="jbm-core-client.jar"/>
+ <property name="core.client.jar.name" value="jbm-core-client.jar"/>
<!--source and build dirs-->
<property name="build.dir" value="build"/>
@@ -583,7 +583,7 @@
</target>
<target name="jar-client" depends="compile-core">
- <jar jarfile="${build.jars.dir}/${core.client..jar.name}">
+ <jar jarfile="${build.jars.dir}/${core.client.jar.name}">
<fileset dir="${build.core.classes.dir}">
<include name="version.properties"/>
<include name="org/jboss/messaging/core/client/**/*.class"/>
@@ -591,6 +591,7 @@
<include name="org/jboss/messaging/core/logging/**/*.class"/>
<include name="org/jboss/messaging/core/remoting/**/*.class"/>
<include name="org/jboss/messaging/util/**/*.class"/>
+ <include name="org/jboss/messaging/core/cluster/**/*.class"/>
<include name="org/jboss/messaging/core/config/**/*.class"/>
<include name="org/jboss/messaging/core/list/**/*.class"/>
<include name="org/jboss/messaging/core/message/**/*.class"/>
@@ -634,7 +635,7 @@
<include name="${security.jar.name}"/>
<include name="${bootstrap.jar.name}"/>
<include name="${logging.jar.name}"/>
- <include name="${core.client..jar.name}"/>
+ <include name="${core.client.jar.name}"/>
</fileset>
<fileset dir="${jboss.microcontainer.lib}">
<include name="jboss-container.jar"/>
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -22,13 +22,13 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ConnectionLoadBalancingPolicy;
import org.jboss.messaging.core.client.ConnectionManager;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.server.cluster.DiscoveryListener;
-import org.jboss.messaging.core.server.cluster.impl.DiscoveryGroupImpl;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
import org.jboss.messaging.util.Pair;
/**
Copied: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java (from rev 5446, trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -0,0 +1,53 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.util.Pair;
+
+/**
+ * A DiscoveryGroup
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 18 Nov 2008 09:26:54
+ *
+ *
+ */
+public interface DiscoveryGroup
+{
+ void start() throws Exception;
+
+ void stop() throws Exception;
+
+ List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors();
+
+ boolean waitForBroadcast(long timeout);
+
+ void registerListener(final DiscoveryListener listener);
+
+ void unregisterListener(final DiscoveryListener listener);
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryGroup.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java (from rev 5446, trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.cluster;
+
+/**
+ * A DiscoveryListener
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 17 Nov 2008 14:30:39
+ *
+ *
+ */
+public interface DiscoveryListener
+{
+ void connectorsChanged();
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/cluster/DiscoveryListener.java
___________________________________________________________________
Name: svn:mergeinfo
+
Copied: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java (from rev 5446, trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -0,0 +1,297 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.cluster.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.InterruptedIOException;
+import java.io.ObjectInputStream;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.MulticastSocket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.jboss.messaging.core.cluster.DiscoveryListener;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.util.Pair;
+
+/**
+ * A DiscoveryGroupImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 17 Nov 2008 13:21:45
+ *
+ */
+public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
+{
+ private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
+
+ private static final int SOCKET_TIMEOUT = 500;
+
+ private MulticastSocket socket;
+
+ private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
+
+ private final Thread thread;
+
+ private boolean received;
+
+ private final Object waitLock = new Object();
+
+ private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
+
+ private final long timeout;
+
+ private volatile boolean started;
+
+ public DiscoveryGroupImpl(final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
+ {
+ socket = new MulticastSocket(groupPort);
+
+ socket.joinGroup(groupAddress);
+
+ socket.setSoTimeout(SOCKET_TIMEOUT);
+
+ this.timeout = timeout;
+
+ thread = new Thread(this);
+
+ thread.setDaemon(true);
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ thread.start();
+
+ started = true;
+ }
+
+ public void stop()
+ {
+ synchronized (this)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ started = false;
+ }
+
+ try
+ {
+ thread.join();
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ socket.close();
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
+ {
+ return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
+ }
+
+ public boolean waitForBroadcast(final long timeout)
+ {
+ synchronized (waitLock)
+ {
+ long start = System.currentTimeMillis();
+
+ long toWait = timeout;
+
+ while (!received && toWait > 0)
+ {
+ try
+ {
+ waitLock.wait(toWait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ long now = System.currentTimeMillis();
+
+ toWait -= now - start;
+
+ start = now;
+ }
+
+ boolean ret = received;
+
+ received = false;
+
+ return ret;
+ }
+ }
+
+ public void run()
+ {
+ //TODO - can we use a smaller buffer size?
+ final byte[] data = new byte[65535];
+
+ final DatagramPacket packet = new DatagramPacket(data, data.length);
+
+ try
+ {
+ while (true)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ try
+ {
+ socket.receive(packet);
+ }
+ catch (InterruptedIOException e)
+ {
+ if (!started)
+ {
+ return;
+ }
+ else
+ {
+ continue;
+ }
+ }
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(data);
+
+ ObjectInputStream ois = new ObjectInputStream(bis);
+
+ int size = ois.readInt();
+
+ boolean changed = false;
+
+ synchronized (this)
+ {
+ for (int i = 0; i < size; i++)
+ {
+ TransportConfiguration connector = (TransportConfiguration)ois.readObject();
+
+ boolean existsBackup = ois.readBoolean();
+
+ TransportConfiguration backupConnector = null;
+
+ if (existsBackup)
+ {
+ backupConnector = (TransportConfiguration)ois.readObject();
+ }
+
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair =
+ new Pair<TransportConfiguration, TransportConfiguration>(connector, backupConnector);
+
+ Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
+
+ if (oldVal == null)
+ {
+ changed = true;
+ }
+ }
+
+ long now = System.currentTimeMillis();
+
+ Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet().iterator();
+
+ //Weed out any expired connectors
+
+ while (iter.hasNext())
+ {
+ Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
+
+ if (entry.getValue() + timeout <= now)
+ {
+ iter.remove();
+
+ changed = true;
+ }
+ }
+ }
+
+ packet.setLength(data.length);
+
+ synchronized (waitLock)
+ {
+ received = true;
+
+ waitLock.notify();
+ }
+
+ if (changed)
+ {
+ callListeners();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive datagram", e);
+ }
+ }
+
+ public synchronized void registerListener(final DiscoveryListener listener)
+ {
+ this.listeners.add(listener);
+ }
+
+ public synchronized void unregisterListener(final DiscoveryListener listener)
+ {
+ this.listeners.remove(listener);
+ }
+
+ private void callListeners()
+ {
+ for (DiscoveryListener listener: listeners)
+ {
+ try
+ {
+ listener.connectorsChanged();
+ }
+ catch (Throwable t)
+ {
+ //Catch it so exception doesn't prevent other listeners from running
+ log.error("Failed to call discovery listener", t);
+ }
+ }
+ }
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
___________________________________________________________________
Name: svn:mergeinfo
+
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java 2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryGroup.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -1,50 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-import java.util.List;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.server.MessagingComponent;
-import org.jboss.messaging.util.Pair;
-
-/**
- * A DiscoveryGroup
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 18 Nov 2008 09:26:54
- *
- *
- */
-public interface DiscoveryGroup extends MessagingComponent
-{
- List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors();
-
- boolean waitForBroadcast(long timeout);
-
- void registerListener(final DiscoveryListener listener);
-
- void unregisterListener(final DiscoveryListener listener);
-}
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java 2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/DiscoveryListener.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -1,38 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.server.cluster;
-
-/**
- * A DiscoveryListener
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 17 Nov 2008 14:30:39
- *
- *
- */
-public interface DiscoveryListener
-{
- void connectorsChanged();
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -32,6 +32,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.impl.DiscoveryGroupImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.cluster.BroadcastGroupConfiguration;
@@ -42,7 +44,6 @@
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.cluster.BroadcastGroup;
import org.jboss.messaging.core.server.cluster.ClusterManager;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
import org.jboss.messaging.core.server.cluster.MessageFlow;
import org.jboss.messaging.core.server.cluster.Transformer;
import org.jboss.messaging.core.settings.HierarchicalRepository;
Deleted: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java 2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/DiscoveryGroupImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -1,297 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.server.cluster.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.InterruptedIOException;
-import java.io.ObjectInputStream;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.MulticastSocket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.server.cluster.DiscoveryListener;
-import org.jboss.messaging.util.Pair;
-
-/**
- * A DiscoveryGroupImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- * Created 17 Nov 2008 13:21:45
- *
- */
-public class DiscoveryGroupImpl implements Runnable, DiscoveryGroup
-{
- private static final Logger log = Logger.getLogger(DiscoveryGroupImpl.class);
-
- private static final int SOCKET_TIMEOUT = 500;
-
- private MulticastSocket socket;
-
- private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>();
-
- private final Thread thread;
-
- private boolean received;
-
- private final Object waitLock = new Object();
-
- private final Map<Pair<TransportConfiguration, TransportConfiguration>, Long> connectors = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, Long>();
-
- private final long timeout;
-
- private volatile boolean started;
-
- public DiscoveryGroupImpl(final InetAddress groupAddress, final int groupPort, final long timeout) throws Exception
- {
- socket = new MulticastSocket(groupPort);
-
- socket.joinGroup(groupAddress);
-
- socket.setSoTimeout(SOCKET_TIMEOUT);
-
- this.timeout = timeout;
-
- thread = new Thread(this);
-
- thread.setDaemon(true);
- }
-
- public synchronized void start() throws Exception
- {
- if (started)
- {
- return;
- }
-
- thread.start();
-
- started = true;
- }
-
- public void stop()
- {
- synchronized (this)
- {
- if (!started)
- {
- return;
- }
-
- started = false;
- }
-
- try
- {
- thread.join();
- }
- catch (InterruptedException e)
- {
- }
-
- socket.close();
- }
-
- public boolean isStarted()
- {
- return started;
- }
-
- public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getConnectors()
- {
- return new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>(connectors.keySet());
- }
-
- public boolean waitForBroadcast(final long timeout)
- {
- synchronized (waitLock)
- {
- long start = System.currentTimeMillis();
-
- long toWait = timeout;
-
- while (!received && toWait > 0)
- {
- try
- {
- waitLock.wait(toWait);
- }
- catch (InterruptedException e)
- {
- }
-
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
- }
-
- boolean ret = received;
-
- received = false;
-
- return ret;
- }
- }
-
- public void run()
- {
- //TODO - can we use a smaller buffer size?
- final byte[] data = new byte[65535];
-
- final DatagramPacket packet = new DatagramPacket(data, data.length);
-
- try
- {
- while (true)
- {
- if (!started)
- {
- return;
- }
-
- try
- {
- socket.receive(packet);
- }
- catch (InterruptedIOException e)
- {
- if (!started)
- {
- return;
- }
- else
- {
- continue;
- }
- }
-
- ByteArrayInputStream bis = new ByteArrayInputStream(data);
-
- ObjectInputStream ois = new ObjectInputStream(bis);
-
- int size = ois.readInt();
-
- boolean changed = false;
-
- synchronized (this)
- {
- for (int i = 0; i < size; i++)
- {
- TransportConfiguration connector = (TransportConfiguration)ois.readObject();
-
- boolean existsBackup = ois.readBoolean();
-
- TransportConfiguration backupConnector = null;
-
- if (existsBackup)
- {
- backupConnector = (TransportConfiguration)ois.readObject();
- }
-
- Pair<TransportConfiguration, TransportConfiguration> connectorPair =
- new Pair<TransportConfiguration, TransportConfiguration>(connector, backupConnector);
-
- Long oldVal = connectors.put(connectorPair, System.currentTimeMillis());
-
- if (oldVal == null)
- {
- changed = true;
- }
- }
-
- long now = System.currentTimeMillis();
-
- Iterator<Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long>> iter = connectors.entrySet().iterator();
-
- //Weed out any expired connectors
-
- while (iter.hasNext())
- {
- Map.Entry<Pair<TransportConfiguration, TransportConfiguration>, Long> entry = iter.next();
-
- if (entry.getValue() + timeout <= now)
- {
- iter.remove();
-
- changed = true;
- }
- }
- }
-
- packet.setLength(data.length);
-
- synchronized (waitLock)
- {
- received = true;
-
- waitLock.notify();
- }
-
- if (changed)
- {
- callListeners();
- }
- }
- }
- catch (Exception e)
- {
- log.error("Failed to receive datagram", e);
- }
- }
-
- public synchronized void registerListener(final DiscoveryListener listener)
- {
- this.listeners.add(listener);
- }
-
- public synchronized void unregisterListener(final DiscoveryListener listener)
- {
- this.listeners.remove(listener);
- }
-
- private void callListeners()
- {
- for (DiscoveryListener listener: listeners)
- {
- try
- {
- listener.connectorsChanged();
- }
- catch (Throwable t)
- {
- //Catch it so exception doesn't prevent other listeners from running
- log.error("Failed to call discovery listener", t);
- }
- }
- }
-}
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2008-12-02 15:05:46 UTC (rev 5451)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
@@ -30,6 +30,8 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
+import org.jboss.messaging.core.cluster.DiscoveryGroup;
+import org.jboss.messaging.core.cluster.DiscoveryListener;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
@@ -37,8 +39,6 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.cluster.DiscoveryGroup;
-import org.jboss.messaging.core.server.cluster.DiscoveryListener;
import org.jboss.messaging.core.server.cluster.Forwarder;
import org.jboss.messaging.core.server.cluster.MessageFlow;
import org.jboss.messaging.core.server.cluster.Transformer;
More information about the jboss-cvs-commits
mailing list