[hornetq-commits] JBoss hornetq SVN: r9527 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq/core: protocol/core/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Aug 11 05:15:57 EDT 2010
Author: jmesnil
Date: 2010-08-11 05:15:56 -0400 (Wed, 11 Aug 2010)
New Revision: 9527
Added:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
HA refactoring
* various fixes
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -44,6 +44,7 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -426,7 +427,7 @@
initialise();
}
- public void connect()
+ public void connect(boolean backup, TransportConfiguration transportConfig)
{
if (initialConnectors != null)
{
@@ -438,6 +439,11 @@
try
{
sf = createSessionFactory(connector);
+ if (sf != null)
+ {
+ ClientSessionFactoryInternal internalSF = (ClientSessionFactoryInternal)sf;
+ internalSF.getConnection().getChannel(0, -1).send(new NodeAnnounceMessage(nodeID, backup, transportConfig));
+ }
}
catch (HornetQException e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -35,9 +35,9 @@
TransportConfiguration getBackup( TransportConfiguration live);
void setNodeID(String nodeID);
-
- void connect();
+ void connect(boolean backup, TransportConfiguration transportConfig);
+
void addClusterTopologyListener(ClusterTopologyListener listener);
void removeClusterTopologyListener(ClusterTopologyListener listener);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
@@ -118,6 +119,11 @@
{
channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
+
+ public String toString()
+ {
+ return "ClusterTopologyListener[address=" + connection.getRemoteAddress() + "]";
+ };
};
final boolean isCC = msg.isClusterConnection();
@@ -132,6 +138,22 @@
}
});
}
+ else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
+ {
+ NodeAnnounceMessage msg = (NodeAnnounceMessage)packet;
+ TransportConfiguration connector = msg.getConnector();
+ boolean backup = msg.isBackup();
+ Pair<TransportConfiguration, TransportConfiguration> pair;
+ if (backup)
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(null, connector);
+ }
+ else
+ {
+ pair = new Pair<TransportConfiguration, TransportConfiguration>(connector, null);
+ }
+ server.getClusterManager().announceNode(msg.getNodeID(), pair);
+ }
}
});
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -21,6 +21,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.DELETE_QUEUE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.DISCONNECT;
import static org.hornetq.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.NODE_ANNOUNCE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.NULL_RESPONSE;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PACKETS_CONFIRMED;
import static org.hornetq.core.protocol.core.impl.PacketImpl.PING;
@@ -90,6 +91,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
@@ -495,6 +497,11 @@
packet = new ClusterTopologyChangeMessage();
break;
}
+ case NODE_ANNOUNCE:
+ {
+ packet = new NodeAnnounceMessage();
+ break;
+ }
case SUBSCRIBE_TOPOLOGY:
{
packet = new SubscribeClusterTopologyUpdatesMessage();
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -185,7 +185,9 @@
// HA
public static final byte CLUSTER_TOPOLOGY = 110;
-
+
+ public static final byte NODE_ANNOUNCE = 111;
+
public static final byte SUBSCRIBE_TOPOLOGY = 112;
// Static --------------------------------------------------------
Added: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java (rev 0)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NodeAnnounceMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(NodeAnnounceMessage.class);
+
+ // Attributes ----------------------------------------------------
+
+ private String nodeID;
+
+ private boolean backup;
+
+ private TransportConfiguration connector;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public NodeAnnounceMessage(final String nodeID, final boolean backup, final TransportConfiguration tc)
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+
+ this.nodeID = nodeID;
+
+ this.backup = backup;
+
+ this.connector = tc;
+ }
+
+ public NodeAnnounceMessage()
+ {
+ super(PacketImpl.NODE_ANNOUNCE);
+ }
+
+ // Public --------------------------------------------------------
+
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public boolean isBackup()
+ {
+ return backup;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ buffer.writeBoolean(backup);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ this.nodeID = buffer.readString();
+ this.backup = buffer.readBoolean();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -18,6 +18,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -29,7 +30,7 @@
*
*
*/
-public interface ClusterConnection extends HornetQComponent
+public interface ClusterConnection extends HornetQComponent, ClusterTopologyListener
{
SimpleString getName();
@@ -56,4 +57,6 @@
// for debug
String description();
+
+ void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -16,7 +16,9 @@
import java.util.Map;
import java.util.Set;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
@@ -43,4 +45,10 @@
void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
void activate();
+
+ void notifyClientsNodeDown(String nodeID);
+
+ void notifyClientsNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean b);
+
+ void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair);
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
@@ -65,7 +66,7 @@
// Attributes ----------------------------------------------------
- protected final ServerLocator serverLocator;
+ protected final ServerLocatorInternal serverLocator;
private final UUID nodeUUID;
@@ -109,7 +110,7 @@
// Public --------------------------------------------------------
- public BridgeImpl(final ServerLocator serverLocator,
+ public BridgeImpl(final ServerLocatorInternal serverLocator,
final UUID nodeUUID,
final SimpleString name,
final Queue queue,
@@ -589,6 +590,8 @@
{
queue.deliverAsync();
}
+
+ log.info("stopped bridge " + name);
}
catch (Exception e)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
@@ -63,8 +64,11 @@
private final TransportConfiguration connector;
- public ClusterConnectionBridge(final ServerLocator serverLocator,
+ private final String targetNodeID;
+
+ public ClusterConnectionBridge(final ServerLocatorInternal serverLocator,
final UUID nodeUUID,
+ final String targetNodeID,
final SimpleString name,
final Queue queue,
final Executor executor,
@@ -99,6 +103,7 @@
idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+ this.targetNodeID = targetNodeID;
this.managementAddress = managementAddress;
this.managementNotificationAddress = managementNotificationAddress;
this.flowRecord = flowRecord;
@@ -224,4 +229,20 @@
return serverLocator.createSessionFactory(connector);
}
+ @Override
+ public void connectionFailed(HornetQException me)
+ {
+ try
+ {
+ session.cleanUp();
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ serverLocator.notifyNodeDown(targetNodeID);
+ super.connectionFailed(me);
+ }
+
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -26,7 +26,6 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -58,7 +57,7 @@
*
*
*/
-public class ClusterConnectionImpl implements ClusterConnection, ClusterTopologyListener
+public class ClusterConnectionImpl implements ClusterConnection
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
@@ -129,7 +128,10 @@
this.serverLocator = serverLocator;
- this.serverLocator.setClusterConnection(true);
+ if (this.serverLocator != null)
+ {
+ this.serverLocator.setClusterConnection(true);
+ }
this.connector = connector;
@@ -167,17 +169,20 @@
return;
}
- serverLocator.addClusterTopologyListener(this);
- serverLocator.start();
-
- // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
- server.getExecutorFactory().getExecutor().execute(new Runnable()
+ if (serverLocator != null)
{
- public void run()
+ serverLocator.addClusterTopologyListener(this);
+ serverLocator.start();
+
+ // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+ server.getExecutorFactory().getExecutor().execute(new Runnable()
{
- serverLocator.connect();
- }
- });
+ public void run()
+ {
+ serverLocator.connect(server.getConfiguration().isBackup(), connector);
+ }
+ });
+ }
started = true;
@@ -198,9 +203,12 @@
{
return;
}
+
+ if (serverLocator != null)
+ {
+ serverLocator.removeClusterTopologyListener(this);
+ }
- serverLocator.removeClusterTopologyListener(this);
-
for (MessageFlowRecord record : records.values())
{
try
@@ -212,6 +220,12 @@
}
}
+ if (serverLocator != null)
+ {
+ serverLocator.close();
+ }
+
+
if (managementService != null)
{
TypedProperties props = new TypedProperties();
@@ -277,6 +291,13 @@
public synchronized void nodeDown(final String nodeID)
{
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
+
+ server.getClusterManager().notifyClientsNodeDown(nodeID);
+
//Remove the flow record for that node
MessageFlowRecord record = records.remove(nodeID);
@@ -285,6 +306,7 @@
{
try
{
+ record.reset();
record.close();
}
catch (Exception e)
@@ -298,6 +320,13 @@
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ return;
+ }
+
+ server.getClusterManager().notifyClientsNodeUp(nodeID, connectorPair, false);
+
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -339,6 +368,21 @@
log.error("Failed to update topology", e);
}
}
+
+ public void announce(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair, boolean b)
+ {
+ TransportConfiguration connector = (backup) ? pair.b : pair.a;
+ if (serverLocator!= null && serverLocator.getStaticTransportConfigurations() != null)
+ {
+ for (TransportConfiguration staticConnector : serverLocator.getStaticTransportConfigurations())
+ {
+ if (connector.equals(staticConnector))
+ {
+ nodeUP(nodeID, pair, false);
+ }
+ }
+ }
+ }
private void createNewRecord(final String nodeID,
final TransportConfiguration connector,
@@ -350,6 +394,7 @@
Bridge bridge = new ClusterConnectionBridge(serverLocator,
nodeUUID,
+ nodeID,
queueName,
queue,
executorFactory.getExecutor(),
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-11 09:14:09 UTC (rev 9526)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-08-11 09:15:56 UTC (rev 9527)
@@ -30,7 +30,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
@@ -69,8 +68,6 @@
private final Map<String, Bridge> bridges = new HashMap<String, Bridge>();
- private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
-
private final ExecutorFactory executorFactory;
private final HornetQServer server;
@@ -91,11 +88,23 @@
private final boolean clustered;
- // FIXME why do we distinguish between client listeners and cluster connection listeners?
- // They are both notified at the same time...
+ // the cluster connections which links this node to other cluster nodes
+ private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
+
+ // regular client listeners to be notified of cluster topology changes.
+ // they correspond to regular clients using a HA ServerLocator
private Set<ClusterTopologyListener> clientListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+
+ // cluster connections listeners to be notified of cluster topology changes
+ // they correspond to cluster connections on *other nodes connected to this one*
private Set<ClusterTopologyListener> clusterConnectionListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+ /*
+ * topology describes the other cluster nodes that this server knows about:
+ *
+ * keys are node IDs
+ * values are a pair of live/backup transport configurations
+ */
private Map<String, Pair<TransportConfiguration, TransportConfiguration>> topology = new HashMap<String, Pair<TransportConfiguration,TransportConfiguration>>();
public ClusterManagerImpl(final ExecutorFactory executorFactory,
@@ -203,6 +212,40 @@
started = false;
}
+ public void notifyClientsNodeDown(String nodeID)
+ {
+ topology.remove(nodeID);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeDown(nodeID);
+ }
+ }
+
+ public void notifyClientsNodeUp(String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ boolean last)
+ {
+ topology.put(nodeID, connectorPair);
+
+ for (ClusterTopologyListener listener : clientListeners)
+ {
+ listener.nodeUP(nodeID, connectorPair, last);
+ }
+ }
+
+ public void announceNode(String nodeID, Pair<TransportConfiguration, TransportConfiguration> pair)
+ {
+ /*
+ System.out.println("node " + server.getNodeID() + " announces " + nodeID + " to its cluster connections " + clusterConnections.keySet());
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ clusterConnection.announce(nodeID, pair, false);
+ }
+ */
+
+ }
+
public boolean isStarted()
{
return started;
@@ -231,6 +274,7 @@
public synchronized void addClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
+ System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " " + clusterConnection + " " + listener);
if (clusterConnection)
{
this.clusterConnectionListeners.add(listener);
@@ -241,10 +285,10 @@
}
// We now need to send the current topology to the client
-
int count = 0;
for (Map.Entry<String, Pair<TransportConfiguration, TransportConfiguration>> entry : topology.entrySet())
{
+ System.out.println("ClusterManagerImpl.addClusterTopologyListener() on " + nodeUUID + " -- " + entry);
listener.nodeUP(entry.getKey(), entry.getValue(), ++count == topology.size());
}
}
@@ -318,7 +362,7 @@
{
listener.nodeUP(nodeID, pair, false);
}
-
+
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
listener.nodeUP(nodeID, pair, false);
@@ -454,7 +498,7 @@
Queue queue = (Queue)binding.getBindable();
- ServerLocator serverLocator;
+ ServerLocatorInternal serverLocator;
if (config.getDiscoveryGroupName() != null)
{
@@ -470,12 +514,12 @@
if (config.isHA())
{
- serverLocator = HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration.getGroupAddress(),
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(discoveryGroupConfiguration.getGroupAddress(),
discoveryGroupConfiguration.getGroupPort());
}
else
{
- serverLocator = HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration.getGroupAddress(),
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(discoveryGroupConfiguration.getGroupAddress(),
discoveryGroupConfiguration.getGroupPort());
}
@@ -491,11 +535,11 @@
if (config.isHA())
{
- serverLocator = HornetQClient.createServerLocatorWithHA(tcConfigs);
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
}
else
{
- serverLocator = HornetQClient.createServerLocatorWithoutHA(tcConfigs);
+ serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
}
}
@@ -561,8 +605,9 @@
TransportConfiguration[] tcConfigs = connectorNameListToArray(config.getStaticConnectors());
serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tcConfigs);
+ serverLocator.setNodeID(nodeUUID.toString());
}
- else
+ else if (config.getDiscoveryGroupName() != null)
{
DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations()
.get(config.getDiscoveryGroupName());
@@ -574,9 +619,15 @@
}
serverLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(dg.getGroupAddress(), dg.getGroupPort());
+ serverLocator.setNodeID(nodeUUID.toString());
}
+ else
+ {
+ // no connector or discovery group are defined. The cluster connection will only be a target and will
+ // no connect to other nodes in the cluster
+ serverLocator = null;
+ }
- serverLocator.setNodeID(nodeUUID.toString());
ClusterConnection clusterConnection = new ClusterConnectionImpl(serverLocator,
connector,
new SimpleString(config.getName()),
More information about the hornetq-commits
mailing list