Author: jmesnil
Date: 2010-07-22 09:01:25 -0400 (Thu, 22 Jul 2010)
New Revision: 9455
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring
* fix receiving list of initial connectors when using static connectors
* clean up ServerLocator interface and move topology-related methods to
ServerLocatorInternal
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
@@ -656,14 +655,6 @@
* Closes this factory and release all its resources
*/
void close();
-
- void registerTopologyListener(ClusterTopologyListener listener);
-
- void unregisterTopologyListener(ClusterTopologyListener listener);
-
- void notifyNodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last);
-
- void notifyNodeDown(String nodeID);
boolean isHA();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -1160,7 +1160,7 @@
}
else
{
- serverLocator.notifyNodeUP(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast());
}
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -297,16 +297,6 @@
discoveryGroup.start();
}
-
- if (initialConnectors != null)
- {
- System.out.println(">>>>>>>> Static initial
connectors = " + Arrays.asList(initialConnectors));
- for (int i = 0; i < initialConnectors.length; i++)
- {
- // FIXME and now what do I do?
- TransportConfiguration connector = initialConnectors[i];
- }
- }
readOnly = true;
}
@@ -432,6 +422,36 @@
initialise();
}
+ public void connect()
+ {
+ if (initialConnectors != null)
+ {
+ for (TransportConfiguration connector : initialConnectors)
+ {
+ ClientSessionFactory sf = null;
+ do
+ {
+ try
+ {
+ sf = createSessionFactory(connector);
+ }
+ catch (HornetQException e)
+ {
+ if (e.getCode() == HornetQException.NOT_CONNECTED)
+ {
+ continue;
+ }
+ }
+ catch (Exception e)
+ {
+ break;
+ }
+ }
+ while (sf == null);
+ }
+ }
+ }
+
public ClientSessionFactory createSessionFactory(final TransportConfiguration
transportConfiguration) throws Exception
{
if (closed)
@@ -1069,7 +1089,7 @@
}
}
- public synchronized void notifyNodeUP(final String nodeID,
+ public synchronized void notifyNodeUp(final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1125,7 +1145,7 @@
{
this.initialConnectors[count++] = entry.getConnector();
- notifyNodeUP(entry.getNodeID(), new Pair<TransportConfiguration,
TransportConfiguration>(entry.getConnector(), null), true);
+ notifyNodeUp(entry.getNodeID(), new Pair<TransportConfiguration,
TransportConfiguration>(entry.getConnector(), null), true);
}
System.out.println(">>>>>>>> Discovered initial
connectors= " + Arrays.asList(initialConnectors));
@@ -1146,12 +1166,12 @@
}
}
- public void registerTopologyListener(final ClusterTopologyListener listener)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.add(listener);
}
- public void unregisterTopologyListener(final ClusterTopologyListener listener)
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.remove(listener);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -13,8 +13,10 @@
package org.hornetq.core.client.impl;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
/**
@@ -34,4 +36,13 @@
void setNodeID(String nodeID);
+ void connect();
+
+ void addClusterTopologyListener(ClusterTopologyListener listener);
+
+ void removeClusterTopologyListener(ClusterTopologyListener listener);
+
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last);
+
+ void notifyNodeDown(String nodeID);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -30,7 +30,6 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
import org.hornetq.core.remoting.CloseListener;
@@ -123,13 +122,13 @@
final boolean isCC = msg.isClusterConnection();
- server.getClusterManager().registerTopologyListener(listener, isCC);
+ server.getClusterManager().addClusterTopologyListener(listener, isCC);
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
{
- server.getClusterManager().unregisterTopologyListener(listener,
isCC);
+ server.getClusterManager().removeClusterTopologyListener(listener,
isCC);
}
});
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -53,4 +53,7 @@
Pair<TransportConfiguration, TransportConfiguration>[] getTopology();
TransportConfiguration getConnector();
+
+ // for debug
+ String description();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -16,9 +16,7 @@
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
@@ -40,13 +38,9 @@
Set<BroadcastGroup> getBroadcastGroups();
- void notifyNodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last);
+ void addClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
- void notifyNodeDown(String nodeID);
-
- void registerTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
+ void removeClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
- void unregisterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
-
void activate();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -485,7 +485,7 @@
do
{
- BridgeImpl.log.info("Connecting bridge " + name + " to its
destination");
+ BridgeImpl.log.info("Connecting bridge " + name + " to its
destination [" + nodeUUID.toString() + "]");
try
{
@@ -510,7 +510,7 @@
queue.addConsumer(BridgeImpl.this);
queue.deliverAsync();
- BridgeImpl.log.info("Bridge " + name + " is connected to its
destination");
+ BridgeImpl.log.info("Bridge " + name + " is connected ["
+ nodeUUID + "-> " + name +"]");
return true;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -164,9 +164,18 @@
return;
}
- serverLocator.registerTopologyListener(this);
+ serverLocator.addClusterTopologyListener(this);
serverLocator.start();
+ // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+ server.getExecutorFactory().getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ serverLocator.connect();
+ }
+ });
+
started = true;
if (managementService != null)
@@ -187,7 +196,7 @@
return;
}
- serverLocator.unregisterTopologyListener(this);
+ serverLocator.removeClusterTopologyListener(this);
for (MessageFlowRecord record : records.values())
{
@@ -334,7 +343,6 @@
final Queue queue,
final boolean start) throws Exception
{
- System.out.println("ClusterConnectionImpl.createNewRecord() " +
connector);
MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
Bridge bridge = new ClusterConnectionBridge(serverLocator,
@@ -801,4 +809,18 @@
{
return records;
}
+
+ public String description()
+ {
+ String out = name + " connected to\n";
+ for (Entry<String, MessageFlowRecord> messageFlow : records.entrySet())
+ {
+ String nodeID = messageFlow.getKey();
+ Bridge bridge = messageFlow.getValue().getBridge();
+
+ out += "\t" + nodeID + " -- " + bridge.isStarted() +
"\n";
+ }
+
+ return out;
+ }
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -91,6 +91,13 @@
private final boolean clustered;
+ // FIXME why do we distinguish between client listeners and cluster connection
listeners?
+ // They are both notified at the same time...
+ private Set<ClusterTopologyListener> clientListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
+ private Set<ClusterTopologyListener> clusterConnectionListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
+
+ private Map<String, Pair<TransportConfiguration, TransportConfiguration>>
topology = new HashMap<String,
Pair<TransportConfiguration,TransportConfiguration>>();
+
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -221,13 +228,7 @@
return clusterConnections.get(name.toString());
}
- private Set<ClusterTopologyListener> clientListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
-
- private Set<ClusterTopologyListener> clusterConnectionListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
-
- private Map<String, Pair<TransportConfiguration, TransportConfiguration>>
topology = new HashMap<String,
Pair<TransportConfiguration,TransportConfiguration>>();
-
- public synchronized void registerTopologyListener(final ClusterTopologyListener
listener,
+ public synchronized void addClusterTopologyListener(final ClusterTopologyListener
listener,
final boolean clusterConnection)
{
if (clusterConnection)
@@ -248,7 +249,7 @@
}
}
- public synchronized void unregisterTopologyListener(final ClusterTopologyListener
listener,
+ public synchronized void removeClusterTopologyListener(final ClusterTopologyListener
listener,
final boolean clusterConnection)
{
if (clusterConnection)
@@ -325,28 +326,6 @@
}
- public synchronized void notifyNodeDown(final String nodeID)
- {
- topology.remove(nodeID);
-
- for (ClusterTopologyListener listener : clientListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
-
- public synchronized void notifyNodeUP(final String nodeID,
- final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- final boolean last)
- {
- topology.put(nodeID, connectorPair);
-
- for (ClusterTopologyListener listener : clientListeners)
- {
- listener.nodeUP(nodeID, connectorPair, false);
- }
- }
-
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration
config) throws Exception
{
if (broadcastGroups.containsKey(config.getName()))
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-22
12:59:20 UTC (rev 9454)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-22
13:01:25 UTC (rev 9455)
@@ -146,7 +146,7 @@
private volatile SimpleString nodeID;
private volatile UUID uuid;
-
+
private final Version version;
private final HornetQSecurityManager securityManager;
@@ -736,7 +736,7 @@
started = true;
- HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " started");
+ HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " [" + nodeID + "]�started");
if (configuration.isBackup())
{
@@ -909,7 +909,7 @@
backupActivationThread.join();
}
- HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " stopped");
+ HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " [" + nodeID + "]�stopped");
Logger.reset();
}
@@ -1586,12 +1586,11 @@
// when the cluster manager is started, it will form a cluster -> other nodes
will then create bridges
// to connect to this server. If the remoting service is not started before, the
connection will fail
// and the cluster will not be formed...
+ initialised = true;
+
remotingService.start();
clusterManager.start();
-
- initialised = true;
-
}
/**