Author: ataylor
Date: 2010-08-16 08:53:50 -0400 (Mon, 16 Aug 2010)
New Revision: 9550
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
Log:
added handling of max hops for creating bridges, abstracted topology
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration>
connectorPair, boolean last);
+ void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration>
connectorPair, boolean last, int distance);
void nodeDown(String nodeID);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -1160,7 +1160,7 @@
}
else
{
- serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast(), topMessage.getDistance());
}
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -44,6 +44,9 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.impl.Topology;
+import org.hornetq.core.server.cluster.impl.TopologyMember;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -51,8 +54,6 @@
* A ServerLocatorImpl
*
* @author Tim Fox
- *
- *
*/
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener,
Serializable
{
@@ -63,7 +64,7 @@
private final boolean ha;
private boolean clusterConnection;
-
+
private final String discoveryAddress;
private final int discoveryPort;
@@ -74,7 +75,7 @@
private TransportConfiguration[] initialConnectors;
- private Map<String, Pair<TransportConfiguration, TransportConfiguration>>
topology = new HashMap<String,
Pair<TransportConfiguration,TransportConfiguration>>();
+ private Topology topology = new Topology();
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -188,7 +189,7 @@
globalScheduledThreadPool =
Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- factory);
+ factory);
}
return globalScheduledThreadPool;
@@ -205,8 +206,8 @@
else
{
ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-factory-threads-" +
System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
if (threadPoolMaxSize == -1)
{
@@ -218,8 +219,8 @@
}
factory = new
HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" +
System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
scheduledThreadPool =
Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
}
@@ -252,14 +253,14 @@
try
{
Class<?> clazz =
loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy)
clazz.newInstance();
return null;
}
catch (Exception e)
{
throw new IllegalArgumentException("Unable to instantiate load
balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
+ "\"",
+ e);
}
}
});
@@ -289,11 +290,11 @@
}
discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryAddress,
- lbAddress,
- groupAddress,
- discoveryPort,
- discoveryRefreshTimeout);
+ discoveryAddress,
+ lbAddress,
+ groupAddress,
+ discoveryPort,
+ discoveryRefreshTimeout);
discoveryGroup.registerListener(this);
@@ -318,7 +319,7 @@
this.initialConnectors = transportConfigs;
this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
+
discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
@@ -378,12 +379,13 @@
initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
+
clusterConnection = false;
}
/**
* Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
* @param discoveryAddress
* @param discoveryPort
*/
@@ -394,6 +396,7 @@
/**
* Create a ServerLocatorImpl using a static list of live servers
+ *
* @param transportConfigs
*/
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration...
transportConfigs)
@@ -468,7 +471,7 @@
}
}
}
-
+
public ClientSessionFactory createSessionFactory(final TransportConfiguration
transportConfiguration) throws Exception
{
if (closed)
@@ -486,19 +489,19 @@
}
ClientSessionFactory factory = new ClientSessionFactoryImpl(this,
-
transportConfiguration,
-
failoverOnServerShutdown,
- callTimeout,
-
clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
-
retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
-
failoverOnInitialConnection,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ transportConfiguration,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
factories.add(factory);
@@ -530,7 +533,7 @@
if (!ok)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial
broadcast from cluster");
+ "Timed out waiting to receive initial broadcast from
cluster");
}
}
@@ -551,19 +554,19 @@
try
{
factory = new ClientSessionFactoryImpl(this,
- tc,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- failoverOnInitialConnection,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ tc,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
}
catch (HornetQException e)
{
@@ -574,7 +577,7 @@
if (topologyArray != null && attempts == topologyArray.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried
with all available servers.");
+ "Cannot connect to server(s). Tried with all available
servers.");
}
retry = true;
@@ -613,7 +616,7 @@
if (toWait <= 0)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster
topology");
+ "Timed out waiting to receive cluster topology");
}
}
@@ -1010,7 +1013,7 @@
{
this.clusterConnection = clusterConnection;
}
-
+
public boolean isClusterConnection()
{
return clusterConnection;
@@ -1095,7 +1098,7 @@
return;
}
- topology.remove(nodeID);
+ topology.removeMember(nodeID);
if (!topology.isEmpty())
{
@@ -1117,15 +1120,16 @@
}
public synchronized void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- final boolean last)
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean last,
+ final int distance)
{
if (!ha)
{
return;
}
- topology.put(nodeID, connectorPair);
+ topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
updateArraysAndPairs();
@@ -1136,8 +1140,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
- System.out.println(this.nodeID + " notified that " + nodeID + "
is UP");
- listener.nodeUP(nodeID, connectorPair, last);
+ listener.nodeUP(nodeID, connectorPair, last, distance);
}
// Notify if waiting on getting topology
@@ -1147,18 +1150,18 @@
private void updateArraysAndPairs()
{
- topologyArray = (Pair<TransportConfiguration,
TransportConfiguration>[])Array.newInstance(Pair.class,
-
topology.size());
+ topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])
Array.newInstance(Pair.class,
+ topology.size());
int count = 0;
- for (Pair<TransportConfiguration, TransportConfiguration> pair :
topology.values())
+ for (TopologyMember pair : topology.getMembers())
{
- if (pair.b != null)
+ if (pair.getConnector().b != null)
{
- pairs.put(pair.a, pair.b);
+ pairs.put(pair.getConnector().a, pair.getConnector().b);
}
- topologyArray[count++] = pair;
+ topologyArray[count++] = pair.getConnector();
}
}
@@ -1166,14 +1169,16 @@
{
List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- this.initialConnectors =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
newConnectors.size());
+ this.initialConnectors = (TransportConfiguration[])
Array.newInstance(TransportConfiguration.class, newConnectors.size());
int count = 0;
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
+
+ notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration,
TransportConfiguration>(entry.getConnector(), null), true, 1);
}
-
+
System.out.println(">>>>>>>> Discovered initial
connectors= " + Arrays.asList(initialConnectors));
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -42,7 +42,7 @@
void removeClusterTopologyListener(ClusterTopologyListener listener);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance);
void notifyNodeDown(String nodeID);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -109,9 +109,9 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last, distance + 1));
}
public void nodeDown(String nodeID)
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -39,11 +39,13 @@
private boolean last;
+ private int distance;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ClusterTopologyChangeMessage(final String nodeID, final
Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+ public ClusterTopologyChangeMessage(final String nodeID, final
Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int
distance)
{
super(PacketImpl.CLUSTER_TOPOLOGY);
@@ -54,6 +56,8 @@
this.last = last;
this.exit = false;
+
+ this.distance = distance;
}
public ClusterTopologyChangeMessage(final String nodeID)
@@ -91,7 +95,17 @@
{
return exit;
}
-
+
+ public int getDistance()
+ {
+ return distance;
+ }
+
+ public void setDistance(int distance)
+ {
+ this.distance = distance;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
@@ -110,6 +124,7 @@
buffer.writeBoolean(false);
}
buffer.writeBoolean(last);
+ buffer.writeInt(distance);
}
}
@@ -135,6 +150,7 @@
}
pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
last = buffer.readBoolean();
+ distance = buffer.readInt();
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -48,5 +48,5 @@
void notifyClientsNodeDown(String nodeID);
- void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean b);
+ void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean b, int distance);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -324,14 +324,16 @@
public synchronized void nodeUP(final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- final boolean last)
+ final boolean last,
+ final int distance)
{
- if (nodeID.equals(nodeUUID.toString()))
+ //we only create a bridge it it isnt ourselves and the node is 1hop away
+ if (nodeID.equals(nodeUUID.toString()) || distance > 1)
{
return;
}
- server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false);
+ server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false,
distance);
try
{
@@ -384,7 +386,7 @@
{
if (connector.equals(staticConnector))
{
- nodeUP(nodeID, pair, false);
+ nodeUP(nodeID, pair, false, 0);
}
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -99,13 +99,7 @@
// they correspond to cluster connections on *other nodes connected to this one*
private Set<ClusterTopologyListener> clusterConnectionListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
- /*
- * topology describes the other cluster nodes that this server knows about:
- *
- * keys are node IDs
- * values are a pair of live/backup transport configurations
- */
- private Map<String, Pair<TransportConfiguration, TransportConfiguration>>
topology = new HashMap<String,
Pair<TransportConfiguration,TransportConfiguration>>();
+ private Topology topology = new Topology();
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
@@ -217,7 +211,7 @@
public void notifyClientsNodeDown(String nodeID)
{
- topology.remove(nodeID);
+ topology.removeMember(nodeID);
for (ClusterTopologyListener listener : clientListeners)
{
@@ -227,13 +221,14 @@
public void notifyClientsNodeUp(String nodeID,
Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- boolean last)
+ boolean last,
+ int distance)
{
- topology.put(nodeID, connectorPair);
+ topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, connectorPair, last);
+ listener.nodeUP(nodeID, connectorPair, last, distance);
}
}
@@ -276,12 +271,7 @@
}
// We now need to send the current topology to the client
- int count = 0;
- for (Map.Entry<String, Pair<TransportConfiguration,
TransportConfiguration>> entry : topology.entrySet())
- {
- System.out.println("ClusterManagerImpl.addClusterTopologyListener() on
" + nodeUUID + " -- " + entry);
- listener.nodeUP(entry.getKey(), entry.getValue(), ++count == topology.size());
- }
+ topology.fireListeners(listener);
}
public synchronized void removeClusterTopologyListener(final ClusterTopologyListener
listener,
@@ -320,30 +310,30 @@
String nodeID = server.getNodeID().toString();
- Pair<TransportConfiguration, TransportConfiguration> pair =
topology.get(nodeID);
+ TopologyMember member = topology.getMember(nodeID);
- if (pair == null)
+ if (member == null)
{
if (backup)
{
- pair = new Pair<TransportConfiguration, TransportConfiguration>(null,
cc.getConnector());
+ member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(null, cc.getConnector()), 0);
}
else
{
- pair = new Pair<TransportConfiguration,
TransportConfiguration>(cc.getConnector(), null);
+ member = new TopologyMember(new Pair<TransportConfiguration,
TransportConfiguration>(cc.getConnector(), null), 0);
}
- topology.put(nodeID, pair);
+ topology.addMember(nodeID, member);
}
else
{
if (backup)
{
- pair.b = cc.getConnector();
+ // pair.b = cc.getConnector();
}
else
{
- pair.a = cc.getConnector();
+ // pair.a = cc.getConnector();
}
}
@@ -351,12 +341,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, pair, false);
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, pair, false);
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
}
@@ -665,5 +655,4 @@
}
return transformer;
}
-
}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class Topology
+{
+ /*
+ * topology describes the other cluster nodes that this server knows about:
+ *
+ * keys are node IDs
+ * values are a pair of live/backup transport configurations
+ */
+ private Map<String, TopologyMember> topology = new HashMap<String,
TopologyMember>();
+
+ public synchronized void addMember(String nodeId, TopologyMember member)
+ {
+ topology.put(nodeId, member);
+ }
+
+ public synchronized void removeMember(String nodeId)
+ {
+ topology.remove(nodeId);
+ }
+
+ public synchronized void fireListeners(ClusterTopologyListener listener)
+ {
+ int count = 0;
+ for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
+ {
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count ==
topology.size(), entry.getValue().getDistance());
+ }
+ }
+
+ public TopologyMember getMember(String nodeID)
+ {
+ return topology.get(nodeID);
+ }
+
+ public boolean isEmpty()
+ {
+ return topology.isEmpty();
+ }
+
+ public Collection<TopologyMember> getMembers()
+ {
+ return topology.values();
+ }
+
+ public int size()
+ {
+ return topology.size();
+ }
+}
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class TopologyMember
+{
+ private final Pair<TransportConfiguration, TransportConfiguration> connector;
+
+ private final int distance;
+ public TopologyMember(Pair<TransportConfiguration, TransportConfiguration>
connector, int distance)
+ {
+ this.connector = connector;
+ this.distance = distance;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getConnector()
+ {
+ return connector;
+ }
+
+ public int getDistance()
+ {
+ return distance;
+ }
+}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-08-16
12:42:35 UTC (rev 9549)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-08-16
12:53:50 UTC (rev 9550)
@@ -14,7 +14,13 @@
package org.hornetq.tests.integration.cluster.distribution;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
+import java.util.Map;
+import java.util.Set;
+
/**
* A OneWayChainClusterTest
*
@@ -337,4 +343,57 @@
verifyNotReceive(0, 1);
}
+ public void testChainClusterConnections() throws Exception
+ {
+ setupClusterConnection("cluster0-1", 0, 1, "queues", false, 4,
isNetty());
+ setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4,
isNetty());
+ setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4,
isNetty());
+ setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4,
isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4,
isNetty());
+
+ startServers(0, 1, 2, 3, 4);
+ Thread.sleep(2000);
+ Set<ClusterConnection> connectionSet =
getServer(0).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ClusterConnectionImpl ccon = (ClusterConnectionImpl)
connectionSet.iterator().next();
+
+ Map<String, MessageFlowRecord> records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+ getServer(1).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+ getServer(2).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+ getServer(3).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+
+ getServer(4).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+
System.out.println("OneWayChainClusterTest.testChainClusterConnections");
+ }
}