[hornetq-commits] JBoss hornetq SVN: r9444 - in branches/2_2_0_HA_Improvements/src/main/org/hornetq: core/client/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Jul 21 06:17:34 EDT 2010
Author: jmesnil
Date: 2010-07-21 06:17:33 -0400 (Wed, 21 Jul 2010)
New Revision: 9444
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HA refactoring
* fix cluster code using discovery group (example/jms/clustered-queue passes again)
* ServerLocator and ClusterManager no longer implement ClusterTopologyListener as they /notify/ the listeners
* DiscoveryEntry keeps track of the nodeID to be able to notify listeners of the node which has joined the cluster
* in ServerLocatorImpl.connectorsChanged(), notify the listeners that a node is UP
/!\ we must also keep track of previous entries to notify when a node is DOWN
* in HornetQServerImpl.initialisePart2(), starts the remoting service *before* the cluster manager. Otherwise the
bridges created when the cluster is formed will not be able to connect to the server and the cluster will not be formed
/!\ this is a modification with lots of implications... not sure about this one
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ServerLocator.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy;
@@ -660,5 +661,9 @@
void unregisterTopologyListener(ClusterTopologyListener listener);
+ void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+
+ void notifyNodeDown(String nodeID);
+
boolean isHA();
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -1156,11 +1156,11 @@
if (topMessage.isExit())
{
- serverLocator.nodeDown(topMessage.getNodeID());
+ serverLocator.notifyNodeDown(topMessage.getNodeID());
}
else
{
- serverLocator.nodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
+ serverLocator.notifyNodeUP(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
}
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -297,6 +297,16 @@
discoveryGroup.start();
}
+
+ if (initialConnectors != null)
+ {
+ System.out.println(">>>>>>>> Static initial connectors = " + Arrays.asList(initialConnectors));
+ for (int i = 0; i < initialConnectors.length; i++)
+ {
+ // FIXME and now what do I do?
+ TransportConfiguration connector = initialConnectors[i];
+ }
+ }
readOnly = true;
}
@@ -1031,7 +1041,7 @@
closed = true;
}
- public synchronized void nodeDown(final String nodeID)
+ public synchronized void notifyNodeDown(final String nodeID)
{
if (!ha)
{
@@ -1059,7 +1069,7 @@
}
}
- public synchronized void nodeUP(final String nodeID,
+ public synchronized void notifyNodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
@@ -1114,9 +1124,11 @@
for (DiscoveryEntry entry : newConnectors)
{
this.initialConnectors[count++] = entry.getConnector();
+
+ notifyNodeUP(entry.getNodeID(), new Pair<TransportConfiguration, TransportConfiguration>(entry.getConnector(), null), true);
}
- System.out.println(">>>>>>>> Initial connectors = " + Arrays.asList(initialConnectors));
+ System.out.println(">>>>>>>> Discovered initial connectors= " + Arrays.asList(initialConnectors));
}
public synchronized void factoryClosed(final ClientSessionFactory factory)
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -15,7 +15,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
/**
@@ -25,7 +24,7 @@
*
*
*/
-public interface ServerLocatorInternal extends ServerLocator, ClusterTopologyListener
+public interface ServerLocatorInternal extends ServerLocator
{
void start() throws Exception;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/DiscoveryEntry.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -24,17 +24,23 @@
*/
public class DiscoveryEntry
{
+ private final String nodeID;
private final TransportConfiguration connector;
-
private final long lastUpdate;
- public DiscoveryEntry(final TransportConfiguration connector, final long lastUpdate)
+
+ public DiscoveryEntry(final String nodeID, final TransportConfiguration connector, final long lastUpdate)
{
+ this.nodeID = nodeID;
this.connector = connector;
-
this.lastUpdate = lastUpdate;
}
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
public TransportConfiguration getConnector()
{
return connector;
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -330,7 +330,7 @@
connector.decode(buffer);
- DiscoveryEntry entry = new DiscoveryEntry(connector, System.currentTimeMillis());
+ DiscoveryEntry entry = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());
DiscoveryEntry oldVal = connectors.put(originatingNodeID, entry);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -16,7 +16,9 @@
import java.util.Map;
import java.util.Set;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.server.HornetQComponent;
@@ -28,7 +30,7 @@
* Created 18 Nov 2008 09:23:26
*
*/
-public interface ClusterManager extends HornetQComponent, ClusterTopologyListener
+public interface ClusterManager extends HornetQComponent
{
Map<String, Bridge> getBridges();
@@ -38,6 +40,10 @@
Set<BroadcastGroup> getBroadcastGroups();
+ void notifyNodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
+
+ void notifyNodeDown(String nodeID);
+
void registerTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
void unregisterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -490,7 +490,6 @@
try
{
csf = createSessionFactory();
-
// Session is pre-acknowledge
session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
@@ -501,9 +500,7 @@
}
producer = session.createProducer();
-
session.addFailureListener(BridgeImpl.this);
-
session.setSendAcknowledgementHandler(BridgeImpl.this);
afterConnect();
@@ -511,7 +508,6 @@
active = true;
queue.addConsumer(BridgeImpl.this);
-
queue.deliverAsync();
BridgeImpl.log.info("Bridge " + name + " is connected to its destination");
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -265,8 +265,6 @@
public synchronized void nodeDown(final String nodeID)
{
- server.getClusterManager().nodeDown(nodeID);
-
//Remove the flow record for that node
MessageFlowRecord record = records.remove(nodeID);
@@ -290,8 +288,6 @@
{
try
{
- server.getClusterManager().nodeUP(nodeID, connectorPair, false);
-
MessageFlowRecord record = records.get(nodeID);
if (record == null)
@@ -307,25 +303,23 @@
if (queueBinding != null)
{
queue = (Queue)queueBinding.getBindable();
-
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
else
{
// Add binding in storage so the queue will get reloaded on startup and we can find it - it's never
// actually routed to at that address though
-
queue = server.createQueue(queueName, queueName, null, true, false);
-
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
+
+ createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
else
{
- if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
- {
- // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
- }
+ // FIXME apple and orange comparison. I don't understand it...
+ //if (!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+ // {
+ // // New live node - close it and recreate it - TODO - CAN THIS EVER HAPPEN?
+ //}
}
}
catch (Exception e)
@@ -340,6 +334,7 @@
final Queue queue,
final boolean start) throws Exception
{
+ System.out.println("ClusterConnectionImpl.createNewRecord() " + connector);
MessageFlowRecordImpl record = new MessageFlowRecordImpl(queue);
Bridge bridge = new ClusterConnectionBridge(serverLocator,
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -325,7 +325,7 @@
}
- public synchronized void nodeDown(final String nodeID)
+ public synchronized void notifyNodeDown(final String nodeID)
{
topology.remove(nodeID);
@@ -335,7 +335,7 @@
}
}
- public synchronized void nodeUP(final String nodeID,
+ public synchronized void notifyNodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-21 09:43:15 UTC (rev 9443)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-07-21 10:17:33 UTC (rev 9444)
@@ -1579,14 +1579,19 @@
deploymentManager.start();
}
+ // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
+ // it is activated
+
+ // FIXME -- I inverted the order to start the remoting service before the cluster manager.
+ // when the cluster manager is started, it will form a cluster -> other nodes will then create bridges
+ // to connect to this server. If the remoting service is not started before, the connection will fail
+ // and the cluster will not be formed...
+ remotingService.start();
+
clusterManager.start();
initialised = true;
- // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until
- // it is activated
-
- remotingService.start();
}
/**
More information about the hornetq-commits
mailing list