JBoss hornetq SVN: r9550 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/client/impl and 5 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-16 08:53:50 -0400 (Mon, 16 Aug 2010)
New Revision: 9550
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
Log:
added handling of max hops for creating bridges, abstracted topology
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+ void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
void nodeDown(String nodeID);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -1160,7 +1160,7 @@
}
else
{
- serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast(), topMessage.getDistance());
}
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -44,6 +44,9 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.impl.Topology;
+import org.hornetq.core.server.cluster.impl.TopologyMember;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -51,8 +54,6 @@
* A ServerLocatorImpl
*
* @author Tim Fox
- *
- *
*/
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
{
@@ -63,7 +64,7 @@
private final boolean ha;
private boolean clusterConnection;
-
+
private final String discoveryAddress;
private final int discoveryPort;
@@ -74,7 +75,7 @@
private TransportConfiguration[] initialConnectors;
- private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
+ private Topology topology = new Topology();
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -188,7 +189,7 @@
globalScheduledThreadPool = Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- factory);
+ factory);
}
return globalScheduledThreadPool;
@@ -205,8 +206,8 @@
else
{
ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
if (threadPoolMaxSize == -1)
{
@@ -218,8 +219,8 @@
}
factory = new HornetQThreadFactory("HornetQ-client-factory-pinger-threads-" + System.identityHashCode(this),
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
scheduledThreadPool = Executors.newScheduledThreadPool(scheduledThreadPoolMaxSize, factory);
}
@@ -252,14 +253,14 @@
try
{
Class<?> clazz = loader.loadClass(connectionLoadBalancingPolicyClassName);
- loadBalancingPolicy = (ConnectionLoadBalancingPolicy)clazz.newInstance();
+ loadBalancingPolicy = (ConnectionLoadBalancingPolicy) clazz.newInstance();
return null;
}
catch (Exception e)
{
throw new IllegalArgumentException("Unable to instantiate load balancing policy \"" + connectionLoadBalancingPolicyClassName +
- "\"",
- e);
+ "\"",
+ e);
}
}
});
@@ -289,11 +290,11 @@
}
discoveryGroup = new DiscoveryGroupImpl(nodeID,
- discoveryAddress,
- lbAddress,
- groupAddress,
- discoveryPort,
- discoveryRefreshTimeout);
+ discoveryAddress,
+ lbAddress,
+ groupAddress,
+ discoveryPort,
+ discoveryRefreshTimeout);
discoveryGroup.registerListener(this);
@@ -318,7 +319,7 @@
this.initialConnectors = transportConfigs;
this.nodeID = UUIDGenerator.getInstance().generateStringUUID();
-
+
discoveryRefreshTimeout = HornetQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT;
clientFailureCheckPeriod = HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
@@ -378,12 +379,13 @@
initialMessagePacketSize = HornetQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
cacheLargeMessagesClient = HornetQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
-
+
clusterConnection = false;
}
/**
* Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
* @param discoveryAddress
* @param discoveryPort
*/
@@ -394,6 +396,7 @@
/**
* Create a ServerLocatorImpl using a static list of live servers
+ *
* @param transportConfigs
*/
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
@@ -468,7 +471,7 @@
}
}
}
-
+
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
{
if (closed)
@@ -486,19 +489,19 @@
}
ClientSessionFactory factory = new ClientSessionFactoryImpl(this,
- transportConfiguration,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- failoverOnInitialConnection,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ transportConfiguration,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
factories.add(factory);
@@ -530,7 +533,7 @@
if (!ok)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive initial broadcast from cluster");
+ "Timed out waiting to receive initial broadcast from cluster");
}
}
@@ -551,19 +554,19 @@
try
{
factory = new ClientSessionFactoryImpl(this,
- tc,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- failoverOnInitialConnection,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ tc,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ failoverOnInitialConnection,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
}
catch (HornetQException e)
{
@@ -574,7 +577,7 @@
if (topologyArray != null && attempts == topologyArray.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available servers.");
+ "Cannot connect to server(s). Tried with all available servers.");
}
retry = true;
@@ -613,7 +616,7 @@
if (toWait <= 0)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
- "Timed out waiting to receive cluster topology");
+ "Timed out waiting to receive cluster topology");
}
}
@@ -1010,7 +1013,7 @@
{
this.clusterConnection = clusterConnection;
}
-
+
public boolean isClusterConnection()
{
return clusterConnection;
@@ -1095,7 +1098,7 @@
return;
}
- topology.remove(nodeID);
+ topology.removeMember(nodeID);
if (!topology.isEmpty())
{
@@ -1117,15 +1120,16 @@
}
public synchronized void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last,
+ final int distance)
{
if (!ha)
{
return;
}
- topology.put(nodeID, connectorPair);
+ topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
updateArraysAndPairs();
@@ -1136,8 +1140,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
- System.out.println(this.nodeID + " notified that " + nodeID + " is UP");
- listener.nodeUP(nodeID, connectorPair, last);
+ listener.nodeUP(nodeID, connectorPair, last, distance);
}
// Notify if waiting on getting topology
@@ -1147,18 +1150,18 @@
private void updateArraysAndPairs()
{
- topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
- topology.size());
+ topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[]) Array.newInstance(Pair.class,
+ topology.size());
int count = 0;
- for (Pair<TransportConfiguration, TransportConfiguration> pair : topology.values())
+ for (TopologyMember pair : topology.getMembers())
{
- if (pair.b != null)
+ if (pair.getConnector().b != null)
{
- pairs.put(pair.a, pair.b);
+ pairs.put(pair.getConnector().a, pair.getConnector().b);
}
- topologyArray[count++] = pair;
+ topologyArray[count++] = pair.getConnector();
}
}
@@ -1166,14 +1169,16 @@
{
List<DiscoveryEntry> newConnectors = discoveryGroup.getDiscoveryEntries();
- this.initialConnectors = (TransportConfiguration[])Array.newInstance(TransportConfiguration.class, newConnectors.size());
+ this.initialConnectors = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, newConnectors.size());
int count = 0;
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
+
+ notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true, 1);
}
-
+
System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -42,7 +42,7 @@
void removeClusterTopologyListener(ClusterTopologyListener listener);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
void notifyNodeDown(String nodeID);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -109,9 +109,9 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last, distance + 1));
}
public void nodeDown(String nodeID)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -39,11 +39,13 @@
private boolean last;
+ private int distance;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
+ public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
{
super(PacketImpl.CLUSTER_TOPOLOGY);
@@ -54,6 +56,8 @@
this.last = last;
this.exit = false;
+
+ this.distance = distance;
}
public ClusterTopologyChangeMessage(final String nodeID)
@@ -91,7 +95,17 @@
{
return exit;
}
-
+
+ public int getDistance()
+ {
+ return distance;
+ }
+
+ public void setDistance(int distance)
+ {
+ this.distance = distance;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
@@ -110,6 +124,7 @@
buffer.writeBoolean(false);
}
buffer.writeBoolean(last);
+ buffer.writeInt(distance);
}
}
@@ -135,6 +150,7 @@
}
pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
last = buffer.readBoolean();
+ distance = buffer.readInt();
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -48,5 +48,5 @@
void notifyClientsNodeDown(String nodeID);
- void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b);
+ void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b, int distance);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -324,14 +324,16 @@
public synchronized void nodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
+ final boolean last,
+ final int distance)
{
- if (nodeID.equals(nodeUUID.toString()))
+ //we only create a bridge it it isnt ourselves and the node is 1hop away
+ if (nodeID.equals(nodeUUID.toString()) || distance > 1)
{
return;
}
- server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false);
+ server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false, distance);
try
{
@@ -384,7 +386,7 @@
{
if (connector.equals(staticConnector))
{
- nodeUP(nodeID, pair, false);
+ nodeUP(nodeID, pair, false, 0);
}
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -99,13 +99,7 @@
// they correspond to cluster connections on *other nodes connected to this one*
private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
- /*
- * topology describes the other cluster nodes that this server knows about:
- *
- * keys are node IDs
- * values are a pair of live/backup transport configurations
- */
- private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
+ private Topology topology = new Topology();
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
@@ -217,7 +211,7 @@
public void notifyClientsNodeDown(String nodeID)
{
- topology.remove(nodeID);
+ topology.removeMember(nodeID);
for (ClusterTopologyListener listener : clientListeners)
{
@@ -227,13 +221,14 @@
public void notifyClientsNodeUp(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last)
+ boolean last,
+ int distance)
{
- topology.put(nodeID, connectorPair);
+ topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, connectorPair, last);
+ listener.nodeUP(nodeID, connectorPair, last, distance);
}
}
@@ -276,12 +271,7 @@
}
// We now need to send the current topology to the client
- int count = 0;
- for (Map.Entry<String, Pair<TransportConfiguration, TransportConfiguration>> entry : topology.entrySet())
- {
- System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " -- " + entry);
- listener.nodeUP(entry.getKey(), entry.getValue(), ++count == topology.size());
- }
+ topology.fireListeners(listener);
}
public synchronized void removeClusterTopologyListener(final ClusterTopologyListener listener,
@@ -320,30 +310,30 @@
String nodeID = server.getNodeID().toString();
- Pair<TransportConfiguration, TransportConfiguration> pair = topology.get(nodeID);
+ TopologyMember member = topology.getMember(nodeID);
- if (pair == null)
+ if (member == null)
{
if (backup)
{
- pair = new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector());
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()), 0);
}
else
{
- pair = new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null);
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null), 0);
}
- topology.put(nodeID, pair);
+ topology.addMember(nodeID, member);
}
else
{
if (backup)
{
- pair.b = cc.getConnector();
+ // pair.b = cc.getConnector();
}
else
{
- pair.a = cc.getConnector();
+ // pair.a = cc.getConnector();
}
}
@@ -351,12 +341,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, pair, false);
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, pair, false);
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
}
@@ -665,5 +655,4 @@
}
return transformer;
}
-
}
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
+
+import java.lang.reflect.Array;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class Topology
+{
+ /*
+ * topology describes the other cluster nodes that this server knows about:
+ *
+ * keys are node IDs
+ * values are a pair of live/backup transport configurations
+ */
+ private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+
+ public synchronized void addMember(String nodeId, TopologyMember member)
+ {
+ topology.put(nodeId, member);
+ }
+
+ public synchronized void removeMember(String nodeId)
+ {
+ topology.remove(nodeId);
+ }
+
+ public synchronized void fireListeners(ClusterTopologyListener listener)
+ {
+ int count = 0;
+ for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
+ {
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+ }
+ }
+
+ public TopologyMember getMember(String nodeID)
+ {
+ return topology.get(nodeID);
+ }
+
+ public boolean isEmpty()
+ {
+ return topology.isEmpty();
+ }
+
+ public Collection<TopologyMember> getMembers()
+ {
+ return topology.values();
+ }
+
+ public int size()
+ {
+ return topology.size();
+ }
+}
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class TopologyMember
+{
+ private final Pair<TransportConfiguration, TransportConfiguration> connector;
+
+ private final int distance;
+ public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
+ {
+ this.connector = connector;
+ this.distance = distance;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getConnector()
+ {
+ return connector;
+ }
+
+ public int getDistance()
+ {
+ return distance;
+ }
+}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-08-16 12:42:35 UTC (rev 9549)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-08-16 12:53:50 UTC (rev 9550)
@@ -14,7 +14,13 @@
package org.hornetq.tests.integration.cluster.distribution;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
+import java.util.Map;
+import java.util.Set;
+
/**
* A OneWayChainClusterTest
*
@@ -337,4 +343,57 @@
verifyNotReceive(0, 1);
}
+ public void testChainClusterConnections() throws Exception
+ {
+ setupClusterConnection("cluster0-1", 0, 1, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster1-2", 1, 2, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster2-3", 2, 3, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster3-4", 3, 4, "queues", false, 4, isNetty());
+ setupClusterConnection("cluster4-X", 4, -1, "queues", false, 4, isNetty());
+
+ startServers(0, 1, 2, 3, 4);
+ Thread.sleep(2000);
+ Set<ClusterConnection> connectionSet = getServer(0).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ClusterConnectionImpl ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ Map<String, MessageFlowRecord> records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+ getServer(1).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+ getServer(2).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+ getServer(3).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+
+ getServer(4).getClusterManager().getClusterConnections();
+ assertNotNull(connectionSet);
+ assertEquals(1, connectionSet.size());
+ ccon = (ClusterConnectionImpl) connectionSet.iterator().next();
+
+ records = ccon.getRecords();
+ assertNotNull(records);
+ assertEquals(records.size(), 1);
+ System.out.println("OneWayChainClusterTest.testChainClusterConnections");
+ }
}
13 years, 9 months
JBoss hornetq SVN: r9549 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-16 08:42:35 -0400 (Mon, 16 Aug 2010)
New Revision: 9549
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
remove noisy log info
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-08-13 20:33:24 UTC (rev 9548)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-08-16 12:42:35 UTC (rev 9549)
@@ -378,7 +378,6 @@
{
//We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
//before failure
- log.info("decrementing delivery count");
ref.decrementDeliveryCount();
}
13 years, 9 months
JBoss hornetq SVN: r9548 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-13 16:33:24 -0400 (Fri, 13 Aug 2010)
New Revision: 9548
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
HORNETQ-477 - Adding commented out tests
Modified: trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2010-08-13 18:42:27 UTC (rev 9547)
+++ trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2010-08-13 20:33:24 UTC (rev 9548)
@@ -180,6 +180,95 @@
session.close();
}
+
+ public void _testQueueWithWildcard() throws Exception
+ {
+ session.createQueue("a.b", "queue1");
+ session.createTemporaryQueue("a.#", "queue2");
+ session.createTemporaryQueue("a.#", "queue3");
+
+ ClientProducer producer = session.createProducer("a.b");
+ producer.send(session.createMessage(false));
+
+ ClientConsumer cons = session.createConsumer("queue2");
+
+ session.start();
+
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ cons.close();
+
+ cons = session.createConsumer("queue3");
+
+ session.start();
+
+ msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ cons.close();
+
+ session.deleteQueue("queue2");
+ session.deleteQueue("queue3");
+
+ session.close();
+ }
+
+
+ public void _testQueueWithWildcard2() throws Exception
+ {
+ session.createQueue("a.b", "queue1");
+ session.createTemporaryQueue("a.#", "queue2");
+ session.createTemporaryQueue("a.#", "queue3");
+
+ ClientProducer producer = session.createProducer("a.b");
+ producer.send(session.createMessage(false));
+
+ ClientConsumer cons = session.createConsumer("queue2");
+
+ session.start();
+
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ cons.close();
+
+ cons = session.createConsumer("queue3");
+
+ session.start();
+
+ msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ cons.close();
+
+ session.deleteQueue("queue2");
+ session.deleteQueue("queue3");
+
+ session.close();
+ }
+
+ public void _testQueueWithWildcard3() throws Exception
+ {
+ session.createQueue("a.b", "queue1");
+ session.createTemporaryQueue("a.#", "queue2");
+ session.createTemporaryQueue("a.#", "queue2.1");
+
+ session.deleteQueue("queue2");
+ }
+
/**
* @see org.hornetq.core.server.impl.ServerSessionImpl#doHandleCreateQueue(org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage)
*/
13 years, 9 months
JBoss hornetq SVN: r9547 - in trunk: src/main/org/hornetq/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-13 14:42:27 -0400 (Fri, 13 Aug 2010)
New Revision: 9547
Modified:
trunk/docs/user-manual/en/persistence.xml
trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
Log:
HORNETQ-180 - Import/Export tool documentation
Modified: trunk/docs/user-manual/en/persistence.xml
===================================================================
--- trunk/docs/user-manual/en/persistence.xml 2010-08-13 15:55:27 UTC (rev 9546)
+++ trunk/docs/user-manual/en/persistence.xml 2010-08-13 18:42:27 UTC (rev 9547)
@@ -67,14 +67,16 @@
<para>The AIO journal is only available when running Linux kernel 2.6 or later and after
having installed libaio (if it's not already installed). For instructions on how to
install libaio please see <xref linkend="installing-aio"/>.</para>
- <para>Also, please note that AIO will only work with the following file systems: ext2, ext3, ext4, jfs, xfs. With other file systems,
- e.g. NFS it may appear to work, but it will fall back to a slower sychronous behaviour. Don't put the journal on a NFS share!</para>
+ <para>Also, please note that AIO will only work with the following file systems: ext2,
+ ext3, ext4, jfs, xfs. With other file systems, e.g. NFS it may appear to work, but
+ it will fall back to a slower sychronous behaviour. Don't put the journal on a NFS
+ share!</para>
<para>For more information on libaio please see <xref linkend="libaio"/>.</para>
<para>libaio is part of the kernel project.</para>
</listitem>
</itemizedlist>
<para>The standard HornetQ core server uses two instances of the journal:</para>
- <itemizedlist>
+ <itemizedlist id="persistence.journallist">
<listitem>
<para>Bindings journal.</para>
<para>This journal is used to store bindings related data. That includes the set of
@@ -82,13 +84,21 @@
such as id sequence counters. </para>
<para>The bindings journal is always a NIO journal as it is typically low throughput
compared to the message journal.</para>
+ <para>The files on this journal are prefixed as <literal>hornetq-bindings</literal>.
+ Each file has a <literal>bindings</literal> extension. File size is <literal
+ >1048576</literal>, and it is located at the bindings folder.</para>
</listitem>
- <listitem>
+ <listitem>
<para>JMS journal.</para>
- <para>This journal instance stores all JMS related data, This is basically any JMS Queues, Topics and Connection
- Factories and any JNDI bindings for these resources.</para>
- <para>Any JMS Resources created via the management API will be persisted to this journal. Any resources
- configured via configuration files will not. The JMS Journal will only be created if JMS is being used.</para>
+ <para>This journal instance stores all JMS related data, This is basically any JMS
+ Queues, Topics and Connection Factories and any JNDI bindings for these
+ resources.</para>
+ <para>Any JMS Resources created via the management API will be persisted to this
+ journal. Any resources configured via configuration files will not. The JMS Journal
+ will only be created if JMS is being used.</para>
+ <para>The files on this journal are prefixed as <literal>hornetq-jms</literal>. Each
+ file has a <literal>jms</literal> extension. File size is <literal
+ >1048576</literal>, and it is located at the bindings folder.</para>
</listitem>
<listitem>
<para>Message journal.</para>
@@ -98,6 +108,10 @@
the platform is not Linux with the correct kernel version or AIO has not been
installed then it will automatically fall back to using Java NIO which is available
on any Java platform.</para>
+ <para>The files on this journal are prefixed as <literal>hornetq-data</literal>. Each
+ file has a <literal>hq</literal> extension. File size is by the default <literal
+ >10485760</literal> (configurable), and it is located at the journal
+ folder.</para>
</listitem>
</itemizedlist>
<para>For large messages, HornetQ persists them outside the message journal. This is discussed
@@ -125,7 +139,7 @@
</listitem>
</itemizedlist>
</section>
- <section id="configuring.bindings.jms">
+ <section id="configuring.bindings.jms">
<title>Configuring the jms journal</title>
<para>The jms config shares its configuration with the bindings journal.</para>
</section>
@@ -299,4 +313,40 @@
persistence will occur. That means no bindings data, message data, large message data,
duplicate id caches or paging data will be persisted.</para>
</section>
+ <section id="persistence.importexport">
+ <title>Import/Export the Journal Data</title>
+ <para>You may want to inspect the existent records on each one of the journals used by
+ HornetQ, and you can use the export/import tool for that purpose. The export/import are
+ classes located at the hornetq-core.jar, you can export the journal as a text file by
+ using this command:</para>
+ <para><literal>java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal
+ <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
+ <FileOutput></literal></para>
+ <para>To import the file as binary data on the journal (Notice you also require
+ netty.jar):</para>
+ <para><literal>java -cp hornetq-core.jar:netty.jar org.hornetq.core.journal.impl.ImportJournal
+ <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize>
+ <FileInput></literal></para>
+ <itemizedlist>
+ <listitem>
+ <para>JournalDirectory: Use the configured folder for your selected folder. Example:
+ ./hornetq/data/journal</para>
+ </listitem>
+ <listitem>
+ <para>JournalPrefix: Use the prefix for your selected journal, as discussed
+ <link linkend="persistence.journallist">here</link></para>
+ </listitem>
+ <listitem>
+ <para>FileExtension: Use the extension for your selected journal, as discussed
+ <link linkend="persistence.journallist">here</link></para>
+ </listitem>
+ <listitem>
+ <para>FileSize: Use the size for your selected journal, as discussed <link
+ linkend="persistence.journallist">here</link></para>
+ </listitem>
+ <listitem>
+ <para>FileOutput: text file that will contain the exported data</para>
+ </listitem>
+ </itemizedlist>
+ </section>
</chapter>
Modified: trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-13 15:55:27 UTC (rev 9546)
+++ trunk/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-13 18:42:27 UTC (rev 9547)
@@ -50,15 +50,15 @@
public static void main(String arg[])
{
- if (arg.length != 6)
+ if (arg.length != 5)
{
- System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>");
+ System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.ExportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
return;
}
try
{
- exportJournal(arg[0], arg[1], arg[2], Integer.parseInt(arg[3]), Integer.parseInt(arg[4]), arg[5]);
+ exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-13 15:55:27 UTC (rev 9546)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-13 18:42:27 UTC (rev 9547)
@@ -54,15 +54,15 @@
public static void main(String arg[])
{
- if (arg.length != 6)
+ if (arg.length != 5)
{
- System.err.println("Use: java -cp hornetq-core.jar org.hornetq.core.journal.impl.ImportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <MinFiles> <FileSize> <FileOutput>");
+ System.err.println("Use: java -cp hornetq-core.jar:netty.jar org.hornetq.core.journal.impl.ImportJournal <JournalDirectory> <JournalPrefix> <FileExtension> <FileSize> <FileOutput>");
return;
}
try
{
- importJournal(arg[0], arg[1], arg[2], Integer.parseInt(arg[3]), Integer.parseInt(arg[4]), arg[5]);
+ importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
}
catch (Exception e)
{
13 years, 9 months
JBoss hornetq SVN: r9546 - trunk/src/main/org/hornetq/core/protocol/stomp.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-13 11:55:27 -0400 (Fri, 13 Aug 2010)
New Revision: 9546
Modified:
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
Log:
https://jira.jboss.org/browse/HORNETQ-481 - Each StompSession should have its own executor from the executorFactory
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-08-13 15:15:31 UTC (rev 9545)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-08-13 15:55:27 UTC (rev 9546)
@@ -454,7 +454,7 @@
StompSession stompSession = sessions.get(connection.getID());
if (stompSession == null)
{
- stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
+ stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
13 years, 9 months
JBoss hornetq SVN: r9545 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-13 11:15:31 -0400 (Fri, 13 Aug 2010)
New Revision: 9545
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
tweak
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-13 15:11:42 UTC (rev 9544)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-13 15:15:31 UTC (rev 9545)
@@ -1660,6 +1660,11 @@
// Replay pending commands (including updates, deletes and commits)
+ for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
+ {
+ newTransaction.replaceRecordProvider(this);
+ }
+
localCompactor.replayPendingCommands();
// Merge transactions back after compacting
@@ -1668,8 +1673,6 @@
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
{
- newTransaction.replaceRecordProvider(this);
-
if (JournalImpl.trace)
{
JournalImpl.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
13 years, 9 months
JBoss hornetq SVN: r9544 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-13 11:11:42 -0400 (Fri, 13 Aug 2010)
New Revision: 9544
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
tweak
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-13 15:03:07 UTC (rev 9543)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-13 15:11:42 UTC (rev 9544)
@@ -129,15 +129,15 @@
{
try
{
- System.out.println("OnCompactSTart enter");
- for (int i = 0; i < 20; i++)
+ System.out.println("OnCompactStart enter");
+ if (running)
{
long id = idGen.generateID();
journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
journal.forceMoveNextFile();
journal.appendDeleteRecord(id, id == 20);
}
- System.out.println("OnCompactSTart leave");
+ System.out.println("OnCompactStart leave");
}
catch (Exception e)
{
13 years, 9 months
JBoss hornetq SVN: r9543 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-13 11:03:07 -0400 (Fri, 13 Aug 2010)
New Revision: 9543
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
HA refactoring
* do *not* notify that nodes are UP or DOWN when the discovery group notifies the server locator that its connector has changed
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-13 15:01:37 UTC (rev 9542)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-13 15:03:07 UTC (rev 9543)
@@ -426,9 +426,10 @@
initialise();
}
- public void connect(boolean backup, TransportConfiguration transportConfig)
+ public void connect()
{
- if (initialConnectors != null)
+ // static list of initial connectors
+ if (initialConnectors != null && discoveryGroup == null)
{
for (TransportConfiguration connector : initialConnectors)
{
@@ -454,6 +455,18 @@
while (sf == null);
}
}
+ // wait for discovery group to get the list of initial connectors
+ else
+ {
+ try
+ {
+ ClientSessionFactory sf = createSessionFactory();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
}
public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration) throws Exception
@@ -1123,6 +1136,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
+ System.out.println(this.nodeID + " notified that " + nodeID + " is UP");
listener.nodeUP(nodeID, connectorPair, last);
}
@@ -1158,8 +1172,6 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
-
- notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
}
System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-13 15:01:37 UTC (rev 9542)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-13 15:03:07 UTC (rev 9543)
@@ -36,7 +36,7 @@
void setNodeID(String nodeID);
- void connect(boolean backup, TransportConfiguration transportConfig);
+ void connect();
void addClusterTopologyListener(ClusterTopologyListener listener);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-13 15:01:37 UTC (rev 9542)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-13 15:03:07 UTC (rev 9543)
@@ -173,13 +173,19 @@
{
serverLocator.addClusterTopologyListener(this);
serverLocator.start();
-
// FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
server.getExecutorFactory().getExecutor().execute(new Runnable()
{
public void run()
{
- serverLocator.connect(server.getConfiguration().isBackup(), connector);
+ try
+ {
+ serverLocator.connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
}
});
}
13 years, 9 months
JBoss hornetq SVN: r9542 - branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-13 11:01:37 -0400 (Fri, 13 Aug 2010)
New Revision: 9542
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
simplified test setup...
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-13 15:01:21 UTC (rev 9541)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-13 15:01:37 UTC (rev 9542)
@@ -1313,40 +1313,18 @@
configuration.setClustered(true);
configuration.setBackup(backup);
- TransportConfiguration nettyBackuptc = null;
-
configuration.getAcceptorConfigurations().clear();
Map<String, Object> params = generateParams(node, netty);
- if (netty)
- {
- TransportConfiguration nettytc = new TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(nettytc);
- }
- else
- {
- TransportConfiguration invmtc = new TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
- }
+ configuration.getAcceptorConfigurations().add(createTransportConfiguration(netty, true, params));
+ TransportConfiguration connector = createTransportConfiguration(netty, false, params);
+ configuration.getConnectorConfigurations().put(connector.getName(), connector);
+
List<String> connectorPairs = new ArrayList<String>();
+ connectorPairs.add(connector.getName());
- if (netty)
- {
- TransportConfiguration nettytc_c = new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, params);
- configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
-
- connectorPairs.add(nettytc_c.getName());
- }
- else
- {
- TransportConfiguration invmtc_c = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
- configuration.getConnectorConfigurations().put(invmtc_c.getName(), invmtc_c);
-
- connectorPairs.add(invmtc_c.getName());
- }
-
BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
null,
-1,
13 years, 9 months
JBoss hornetq SVN: r9541 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-08-13 11:01:21 -0400 (Fri, 13 Aug 2010)
New Revision: 9541
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
clear the clusterConnections when the manager is stopped
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-13 13:45:27 UTC (rev 9540)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-13 15:01:21 UTC (rev 9541)
@@ -192,13 +192,16 @@
managementService.unregisterBroadcastGroup(group.getName());
}
+ broadcastGroups.clear();
+
for (ClusterConnection clusterConnection : clusterConnections.values())
{
clusterConnection.stop();
managementService.unregisterCluster(clusterConnection.getName().toString());
}
+
+ clusterConnections.clear();
- broadcastGroups.clear();
}
for (Bridge bridge : bridges.values())
13 years, 9 months