[hornetq-commits] JBoss hornetq SVN: r9565 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/cluster/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Aug 19 07:42:36 EDT 2010
Author: jmesnil
Date: 2010-08-19 07:42:35 -0400 (Thu, 19 Aug 2010)
New Revision: 9565
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Removed:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
HA refactoring
* improvements for discovery, cluster formation using node notifications, etc.
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -43,6 +43,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.FailureListener;
@@ -1072,6 +1073,11 @@
if (serverLocator.isHA())
{
channel0.send(new SubscribeClusterTopologyUpdatesMessage(serverLocator.isClusterConnection()));
+ if (serverLocator.isClusterConnection())
+ {
+ TransportConfiguration config = serverLocator.getClusterTransportConfiguration();
+ channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(), serverLocator.isBackup(), config));
+ }
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -18,7 +18,6 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -44,9 +43,6 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.cluster.impl.Topology;
-import org.hornetq.core.server.cluster.impl.TopologyMember;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -166,6 +162,10 @@
private String groupID;
private String nodeID;
+
+ private TransportConfiguration clusterTransportConfiguration;
+
+ private boolean backup;
private static synchronized ExecutorService getGlobalThreadPool()
{
@@ -504,7 +504,7 @@
interceptors);
factories.add(factory);
-
+
return factory;
}
@@ -1008,6 +1008,11 @@
{
this.nodeID = nodeID;
}
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
public void setClusterConnection(boolean clusterConnection)
{
@@ -1018,7 +1023,27 @@
{
return clusterConnection;
}
+
+ public TransportConfiguration getClusterTransportConfiguration()
+ {
+ return clusterTransportConfiguration;
+ }
+
+ public void setClusterTransportConfiguration(TransportConfiguration tc)
+ {
+ this.clusterTransportConfiguration = tc;
+ }
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public void setBackup(boolean backup)
+ {
+ this.backup = backup;
+ }
+
@Override
protected void finalize() throws Throwable
{
@@ -1091,31 +1116,43 @@
closed = true;
}
- public synchronized void notifyNodeDown(final String nodeID)
+ public void notifyNodeDown(final String nodeID)
{
- if (!ha)
+ boolean removed = false;
+ synchronized (this)
{
- return;
- }
+ if (!ha)
+ {
+ return;
+ }
- topology.removeMember(nodeID);
+ removed = topology.removeMember(nodeID);
+
+ if (!topology.isEmpty())
+ {
+ updateArraysAndPairs();
+
+ if (topology.size() == 1 && topology.getMember(nodeID) != null)
+ {
+ receivedTopology = false;
+ }
+ }
+ else
+ {
+ pairs.clear();
- if (!topology.isEmpty())
- {
- updateArraysAndPairs();
- }
- else
- {
- pairs.clear();
+ topologyArray = null;
- topologyArray = null;
-
- receivedTopology = false;
+ receivedTopology = false;
+ }
}
- for (ClusterTopologyListener listener : topologyListeners)
+ if (removed)
{
- listener.nodeDown(nodeID);
+ for (ClusterTopologyListener listener : topologyListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
}
}
@@ -1144,7 +1181,6 @@
}
// Notify if waiting on getting topology
-
notify();
}
@@ -1175,11 +1211,14 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
-
- notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true, 1);
}
-
- System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
+
+ if (clusterConnection && !receivedTopology)
+ {
+ // FIXME the node is alone in the cluster. We create a connection to the new node
+ // to trigger the node notification to form the cluster.
+ connect();
+ }
}
public synchronized void factoryClosed(final ClientSessionFactory factory)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -35,7 +35,9 @@
TransportConfiguration getBackup( TransportConfiguration live);
void setNodeID(String nodeID);
-
+
+ String getNodeID();
+
void connect();
void addClusterTopologyListener(ClusterTopologyListener listener);
@@ -49,4 +51,13 @@
void setClusterConnection(boolean clusterConnection);
boolean isClusterConnection();
+
+ TransportConfiguration getClusterTransportConfiguration();
+
+ void setClusterTransportConfiguration(TransportConfiguration tc);
+
+ boolean isBackup();
+
+ void setBackup(boolean backup);
+
}
Copied: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java (from rev 9550, branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java)
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.client.ClusterTopologyListener;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class Topology
+{
+ /*
+ * topology describes the other cluster nodes that this server knows about:
+ *
+ * keys are node IDs
+ * values are a pair of live/backup transport configurations
+ */
+ private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
+
+ public synchronized boolean addMember(String nodeId, TopologyMember member)
+ {
+ boolean replaced = topology.containsKey(nodeId);
+ topology.put(nodeId, member);
+ return replaced;
+ }
+
+ public synchronized boolean removeMember(String nodeId)
+ {
+ TopologyMember member = topology.remove(nodeId);
+ return (member != null);
+ }
+
+ public synchronized void fireListeners(ClusterTopologyListener listener)
+ {
+ int count = 0;
+ for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
+ {
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+ }
+ }
+
+ public TopologyMember getMember(String nodeID)
+ {
+ return topology.get(nodeID);
+ }
+
+ public boolean isEmpty()
+ {
+ return topology.isEmpty();
+ }
+
+ public Collection<TopologyMember> getMembers()
+ {
+ return topology.values();
+ }
+
+ public int size()
+ {
+ return topology.size();
+ }
+
+ public String describe()
+ {
+
+ String desc = "";
+ for (Entry<String, TopologyMember> entry : new HashMap<String, TopologyMember>(topology).entrySet())
+ {
+ desc += "\t" + entry.getKey() + " => " + entry.getValue() + "\n";
+ }
+ return desc;
+ }
+
+ public void clear()
+ {
+ topology.clear();
+ }
+}
Copied: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java (from rev 9550, branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java)
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.client.impl;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.TransportConfiguration;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * Created Aug 16, 2010
+ */
+public class TopologyMember
+{
+ private final Pair<TransportConfiguration, TransportConfiguration> connector;
+
+ private final int distance;
+
+ public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
+ {
+ this.connector = connector;
+ this.distance = distance;
+ }
+
+ public Pair<TransportConfiguration, TransportConfiguration> getConnector()
+ {
+ return connector;
+ }
+
+ public int getDistance()
+ {
+ return distance;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TopologyMember[distance=" + distance + ", connector=" + connector + "]";
+ }
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -413,7 +413,6 @@
if (entry.getValue().getLastUpdate() + timeout <= now)
{
- System.out.println("remove " + entry);
iter.remove();
changed = true;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
@@ -118,11 +119,6 @@
{
channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
-
- public String toString()
- {
- return "ClusterTopologyListener[address=" + connection.getRemoteAddress() + "]";
- };
};
final boolean isCC = msg.isClusterConnection();
@@ -137,6 +133,21 @@
}
});
}
+ else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+ {
+ NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
+
+ Pair<TransportConfiguration, TransportConfiguration> pair;
+ if (msg.isBackup())
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(null, msg.getConnector());
+ }
+ else
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
+ }
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+ }
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -21,6 +21,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -90,6 +91,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -495,6 +497,11 @@
packet = new ClusterTopologyChangeMessage();
break;
}
+ case NODE_ANNOUNCE:
+ {
+ packet = new NodeAnnounceMessage();
+ break;
+ }
case SUBSCRIBE_TOPOLOGY:
{
packet = new SubscribeClusterTopologyUpdatesMessage();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -186,6 +186,8 @@
public static final byte CLUSTER_TOPOLOGY = 110;
+ public static final byte NODE_ANNOUNCE = 111;
+
public static final byte SUBSCRIBE_TOPOLOGY = 112;
// Static --------------------------------------------------------
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NodeAnnounceMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private String nodeID;
+
+ private boolean backup;
+
+ private TransportConfiguration connector;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+
+ this.nodeID = nodeID;
+
+ this.backup = backup;
+
+ this.connector = tc;
+ }
+
+ public NodeAnnounceMessage()
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ buffer.writeBoolean(backup);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ this.nodeID = buffer.readString();
+ this.backup = buffer.readBoolean();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -15,7 +15,6 @@
import java.util.Map;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -51,12 +50,8 @@
void activate();
- Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
-
TransportConfiguration getConnector();
// for debug
String description();
-
- void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -20,6 +20,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -46,7 +47,9 @@
void activate();
- void notifyClientsNodeDown(String nodeID);
+ void notifyNodeDown(String nodeID);
- void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+
+ Topology getTopology();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -25,7 +25,6 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
@@ -238,8 +237,7 @@
}
catch (Exception e)
{
- // TODO Auto-generated catch block
- e.printStackTrace();
+ log.warn("Unable to clean up the session after a connection failure", e);
}
serverLocator.notifyNodeDown(targetNodeID);
super.connectionFailed(me);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -93,12 +93,12 @@
private final String clusterPassword;
- private Pair<TransportConfiguration, TransportConfiguration>[] topology;
-
private final ServerLocatorInternal serverLocator;
private final TransportConfiguration connector;
+ private final boolean allowsDirectConnectionsOnly;
+
public ClusterConnectionImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration connector,
final SimpleString name,
@@ -131,6 +131,15 @@
if (this.serverLocator != null)
{
this.serverLocator.setClusterConnection(true);
+ this.serverLocator.setClusterTransportConfiguration(connector);
+ this.serverLocator.setBackup(server.getConfiguration().isBackup());
+
+ // a cluster connection will connect to other nodes only if they are directly connected
+ // through a static list of connectors
+ allowsDirectConnectionsOnly = (serverLocator.getStaticTransportConfigurations() != null);
+ } else
+ {
+ allowsDirectConnectionsOnly = false;
}
this.connector = connector;
@@ -245,11 +254,6 @@
started = false;
}
- public Pair<TransportConfiguration, TransportConfiguration>[] getTopology()
- {
- return topology;
- }
-
public boolean isStarted()
{
return started;
@@ -302,8 +306,6 @@
return;
}
- server.getClusterManager().notifyClientsNodeDown(nodeID);
-
//Remove the flow record for that node
MessageFlowRecord record = records.remove(nodeID);
@@ -313,13 +315,15 @@
try
{
record.reset();
- record.close();
+ //record.close();
}
catch (Exception e)
{
log.error("Failed to close flow record", e);
}
}
+
+ server.getClusterManager().notifyNodeDown(nodeID);
}
public synchronized void nodeUP(final String nodeID,
@@ -327,14 +331,28 @@
final boolean last,
final int distance)
{
- //we only create a bridge it it isnt ourselves and the node is 1hop away
- if (nodeID.equals(nodeUUID.toString()) || distance > 1)
+ // discard notifications about ourselves
+ if (nodeID.equals(nodeUUID.toString()))
{
return;
}
-
- server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false, distance);
-
+
+ // we propagate the node notifications to all cluster topology listeners
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+
+ // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+ if (allowsDirectConnectionsOnly && distance > 1)
+ {
+ return;
+ }
+
+ // FIXME required to prevent cluster connections w/o discovery group
+ // and empty static connectors to create bridges... ulgy!
+ if (serverLocator == null)
+ {
+ return;
+ }
+
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -377,21 +395,6 @@
}
}
- public void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b)
- {
- TransportConfiguration connector = (backup) ? pair.b : pair.a;
- if (serverLocator!= null && serverLocator.getStaticTransportConfigurations() != null)
- {
- for (TransportConfiguration staticConnector : serverLocator.getStaticTransportConfigurations())
- {
- if (connector.equals(staticConnector))
- {
- nodeUP(nodeID, pair, false, 0);
- }
- }
- }
- }
-
private void createNewRecord(final String nodeID,
final TransportConfiguration connector,
final SimpleString queueName,
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -25,12 +25,16 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import apple.awt.CList;
+
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.Topology;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -194,7 +198,10 @@
managementService.unregisterCluster(clusterConnection.getName().toString());
}
+ clusterConnectionListeners.clear();
+ clientListeners.clear();
clusterConnections.clear();
+ topology.clear();
}
@@ -209,27 +216,56 @@
started = false;
}
- public void notifyClientsNodeDown(String nodeID)
+ public void notifyNodeDown(String nodeID)
{
- topology.removeMember(nodeID);
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
- for (ClusterTopologyListener listener : clientListeners)
+ boolean removed = topology.removeMember(nodeID);
+
+ if (removed)
{
- listener.nodeDown(nodeID);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
}
}
- public void notifyClientsNodeUp(String nodeID,
+ public void notifyNodeUp(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
boolean last,
int distance)
{
- topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
+
+ boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ if (distance >= topology.size() || updated)
+ {
+ return;
+ }
+
for (ClusterTopologyListener listener : clientListeners)
{
listener.nodeUP(nodeID, connectorPair, last, distance);
}
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last, distance);
+ }
}
public boolean isStarted()
@@ -260,7 +296,6 @@
public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
- System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " " + clusterConnection + " " + listener);
if (clusterConnection)
{
this.clusterConnectionListeners.add(listener);
@@ -287,6 +322,11 @@
}
}
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
// backup node becomes live
public synchronized void activate()
{
@@ -609,7 +649,7 @@
serverLocator = null;
}
- ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
+ ClusterConnectionImpl clusterConnection = new ClusterConnectionImpl(serverLocator,
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/Topology.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -1,76 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.cluster.impl;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-
-import java.lang.reflect.Array;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * Created Aug 16, 2010
- */
-public class Topology
-{
- /*
- * topology describes the other cluster nodes that this server knows about:
- *
- * keys are node IDs
- * values are a pair of live/backup transport configurations
- */
- private Map<String, TopologyMember> topology = new HashMap<String, TopologyMember>();
-
- public synchronized void addMember(String nodeId, TopologyMember member)
- {
- topology.put(nodeId, member);
- }
-
- public synchronized void removeMember(String nodeId)
- {
- topology.remove(nodeId);
- }
-
- public synchronized void fireListeners(ClusterTopologyListener listener)
- {
- int count = 0;
- for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
- {
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
- }
- }
-
- public TopologyMember getMember(String nodeID)
- {
- return topology.get(nodeID);
- }
-
- public boolean isEmpty()
- {
- return topology.isEmpty();
- }
-
- public Collection<TopologyMember> getMembers()
- {
- return topology.values();
- }
-
- public int size()
- {
- return topology.size();
- }
-}
Deleted: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/TopologyMember.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -1,42 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.cluster.impl;
-
-import org.hornetq.api.core.Pair;
-import org.hornetq.api.core.TransportConfiguration;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * Created Aug 16, 2010
- */
-public class TopologyMember
-{
- private final Pair<TransportConfiguration, TransportConfiguration> connector;
-
- private final int distance;
- public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
- {
- this.connector = connector;
- this.distance = distance;
- }
-
- public Pair<TransportConfiguration, TransportConfiguration> getConnector()
- {
- return connector;
- }
-
- public int getDistance()
- {
- return distance;
- }
-}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -760,6 +760,8 @@
out += cc.description() + "\n";
}
}
+ out += "\n\nfull topology:";
+ out += clusterManager.getTopology().describe();
return out + br;
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -189,6 +189,12 @@
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
@@ -1326,6 +1332,11 @@
public void testStartStopServers() throws Exception
{
+ doTestStartStopServers(1, 3000);
+ }
+
+ public void doTestStartStopServers(long pauseBeforeServerRestarts, long pauseAfterServerRestarts) throws Exception
+ {
setupCluster();
startServers();
@@ -1412,6 +1423,15 @@
waitForBindings(3, "queues.testaddress", 6, 6, true);
waitForBindings(4, "queues.testaddress", 7, 7, true);
+ Thread.sleep(2000);
+ System.out.println("#####################################");
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+ System.out.println(clusterDescription(servers[2]));
+ System.out.println(clusterDescription(servers[3]));
+ System.out.println(clusterDescription(servers[4]));
+ System.out.println("#####################################");
+
waitForBindings(0, "queues.testaddress", 23, 23, false);
waitForBindings(1, "queues.testaddress", 23, 23, false);
waitForBindings(2, "queues.testaddress", 23, 23, false);
@@ -1455,9 +1475,11 @@
System.out.println(clusterDescription(servers[4]));
System.out.println("#####################################");
+ Thread.sleep(pauseBeforeServerRestarts);
+
startServers(3, 0);
- Thread.sleep(3000);
+ Thread.sleep(pauseAfterServerRestarts);
setupSessionFactory(0, isNetty());
setupSessionFactory(3, isNetty());
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithDiscoveryTest.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -98,193 +98,7 @@
*/
public void testStartStopServersWithPauseBeforeRestarting() throws Exception
{
- setupCluster();
-
- startServers();
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(1, isNetty());
- setupSessionFactory(2, isNetty());
- setupSessionFactory(3, isNetty());
- setupSessionFactory(4, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(1, "queues.testaddress", "queue1", null, false);
- createQueue(2, "queues.testaddress", "queue2", null, false);
- createQueue(3, "queues.testaddress", "queue3", null, false);
- createQueue(4, "queues.testaddress", "queue4", null, false);
-
- createQueue(0, "queues.testaddress", "queue5", null, false);
- createQueue(1, "queues.testaddress", "queue6", null, false);
- createQueue(2, "queues.testaddress", "queue7", null, false);
- createQueue(3, "queues.testaddress", "queue8", null, false);
- createQueue(4, "queues.testaddress", "queue9", null, false);
-
- createQueue(0, "queues.testaddress", "queue10", null, false);
- createQueue(1, "queues.testaddress", "queue11", null, false);
- createQueue(2, "queues.testaddress", "queue12", null, false);
- createQueue(3, "queues.testaddress", "queue13", null, false);
- createQueue(4, "queues.testaddress", "queue14", null, false);
-
- createQueue(0, "queues.testaddress", "queue15", null, false);
- createQueue(1, "queues.testaddress", "queue15", null, false);
- createQueue(2, "queues.testaddress", "queue15", null, false);
- createQueue(3, "queues.testaddress", "queue15", null, false);
- createQueue(4, "queues.testaddress", "queue15", null, false);
-
- createQueue(2, "queues.testaddress", "queue16", null, false);
- createQueue(3, "queues.testaddress", "queue16", null, false);
- createQueue(4, "queues.testaddress", "queue16", null, false);
-
- createQueue(0, "queues.testaddress", "queue17", null, false);
- createQueue(1, "queues.testaddress", "queue17", null, false);
- createQueue(4, "queues.testaddress", "queue17", null, false);
-
- createQueue(3, "queues.testaddress", "queue18", null, false);
- createQueue(4, "queues.testaddress", "queue18", null, false);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(1, 1, "queue1", null);
- addConsumer(2, 2, "queue2", null);
- addConsumer(3, 3, "queue3", null);
- addConsumer(4, 4, "queue4", null);
-
- addConsumer(5, 0, "queue5", null);
- addConsumer(6, 1, "queue6", null);
- addConsumer(7, 2, "queue7", null);
- addConsumer(8, 3, "queue8", null);
- addConsumer(9, 4, "queue9", null);
-
- addConsumer(10, 0, "queue10", null);
- addConsumer(11, 1, "queue11", null);
- addConsumer(12, 2, "queue12", null);
- addConsumer(13, 3, "queue13", null);
- addConsumer(14, 4, "queue14", null);
-
- addConsumer(15, 0, "queue15", null);
- addConsumer(16, 1, "queue15", null);
- addConsumer(17, 2, "queue15", null);
- addConsumer(18, 3, "queue15", null);
- addConsumer(19, 4, "queue15", null);
-
- addConsumer(20, 2, "queue16", null);
- addConsumer(21, 3, "queue16", null);
- addConsumer(22, 4, "queue16", null);
-
- addConsumer(23, 0, "queue17", null);
- addConsumer(24, 1, "queue17", null);
- addConsumer(25, 4, "queue17", null);
-
- addConsumer(26, 3, "queue18", null);
- addConsumer(27, 4, "queue18", null);
-
- waitForBindings(0, "queues.testaddress", 5, 5, true);
- waitForBindings(1, "queues.testaddress", 5, 5, true);
- waitForBindings(2, "queues.testaddress", 5, 5, true);
- waitForBindings(3, "queues.testaddress", 6, 6, true);
- waitForBindings(4, "queues.testaddress", 7, 7, true);
-
- waitForBindings(0, "queues.testaddress", 23, 23, false);
- waitForBindings(1, "queues.testaddress", 23, 23, false);
- waitForBindings(2, "queues.testaddress", 23, 23, false);
- waitForBindings(3, "queues.testaddress", 22, 22, false);
- waitForBindings(4, "queues.testaddress", 21, 21, false);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
-
- verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
-
- verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
-
- verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
-
- verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
-
- removeConsumer(0);
- removeConsumer(5);
- removeConsumer(10);
- removeConsumer(15);
- removeConsumer(23);
- removeConsumer(3);
- removeConsumer(8);
- removeConsumer(13);
- removeConsumer(18);
- removeConsumer(21);
- removeConsumer(26);
-
- closeSessionFactory(0);
- closeSessionFactory(3);
-
- stopServers(0, 3);
-
- Thread.sleep(10000);
-
- startServers(3, 0);
-
- setupSessionFactory(0, isNetty());
- setupSessionFactory(3, isNetty());
-
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(3, "queues.testaddress", "queue3", null, false);
-
- createQueue(0, "queues.testaddress", "queue5", null, false);
- createQueue(3, "queues.testaddress", "queue8", null, false);
-
- createQueue(0, "queues.testaddress", "queue10", null, false);
- createQueue(3, "queues.testaddress", "queue13", null, false);
-
- createQueue(0, "queues.testaddress", "queue15", null, false);
- createQueue(3, "queues.testaddress", "queue15", null, false);
-
- createQueue(3, "queues.testaddress", "queue16", null, false);
-
- createQueue(0, "queues.testaddress", "queue17", null, false);
-
- createQueue(3, "queues.testaddress", "queue18", null, false);
-
- addConsumer(0, 0, "queue0", null);
- addConsumer(3, 3, "queue3", null);
-
- addConsumer(5, 0, "queue5", null);
- addConsumer(8, 3, "queue8", null);
-
- addConsumer(10, 0, "queue10", null);
- addConsumer(13, 3, "queue13", null);
-
- addConsumer(15, 0, "queue15", null);
- addConsumer(18, 3, "queue15", null);
-
- addConsumer(21, 3, "queue16", null);
-
- addConsumer(23, 0, "queue17", null);
-
- addConsumer(26, 3, "queue18", null);
-
- waitForBindings(0, "queues.testaddress", 5, 5, true);
- waitForBindings(1, "queues.testaddress", 5, 5, true);
- waitForBindings(2, "queues.testaddress", 5, 5, true);
- waitForBindings(3, "queues.testaddress", 6, 6, true);
- waitForBindings(4, "queues.testaddress", 7, 7, true);
-
- waitForBindings(0, "queues.testaddress", 23, 23, false);
- waitForBindings(1, "queues.testaddress", 23, 23, false);
- waitForBindings(2, "queues.testaddress", 23, 23, false);
- waitForBindings(3, "queues.testaddress", 22, 22, false);
- waitForBindings(4, "queues.testaddress", 21, 21, false);
-
- send(0, "queues.testaddress", 10, false, null);
-
- verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
-
- verifyReceiveRoundRobinInSomeOrder(10, 15, 16, 17, 18, 19);
-
- verifyReceiveRoundRobinInSomeOrder(10, 20, 21, 22);
-
- verifyReceiveRoundRobinInSomeOrder(10, 23, 24, 25);
-
- verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
+ doTestStartStopServers(10000, 3000);
}
}
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-08-18 19:29:30 UTC (rev 9564)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-08-19 11:42:35 UTC (rev 9565)
@@ -92,7 +92,39 @@
stopServers(0, 1);
}
+
+ public void testStartPauseStartOther() throws Exception
+ {
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+ createQueue(0, "queues", "queue0", null, false);
+ addConsumer(0, 0, "queue0", null);
+
+ // we let the discovery initial timeout expire,
+ // #0 will be alone in the cluster
+ Thread.sleep(12000);
+
+ startServers(1);
+ setupSessionFactory(1, isNetty());
+ createQueue(1, "queues", "queue0", null, false);
+
+ addConsumer(1, 1, "queue0", null);
+
+ waitForBindings(0, "queues", 1, 1, true);
+ waitForBindings(1, "queues", 1, 1, true);
+
+ waitForBindings(0, "queues", 1, 1, false);
+ waitForBindings(1, "queues", 1, 1, false);
+
+ send(0, "queues", 10, false, null);
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyNotReceive(0, 1);
+
+ stopServers(0, 1);
+ }
+
public void testStopStart() throws Exception
{
startServers(0, 1);
@@ -127,7 +159,12 @@
System.out.println(clusterDescription(servers[0]));
startServers(1);
+
+ Thread.sleep(3000);
+ System.out.println(clusterDescription(servers[0]));
+ System.out.println(clusterDescription(servers[1]));
+
setupSessionFactory(1, isNetty());
createQueue(1, "queues", "queue0", null, false);
@@ -137,9 +174,6 @@
waitForBindings(0, "queues", 1, 1, true);
waitForBindings(1, "queues", 1, 1, true);
- System.out.println(clusterDescription(servers[0]));
- System.out.println(clusterDescription(servers[1]));
-
waitForBindings(1, "queues", 1, 1, false);
waitForBindings(0, "queues", 1, 1, false);
More information about the hornetq-commits
mailing list