Author: ataylor
Date: 2010-09-22 05:54:22 -0400 (Wed, 22 Sep 2010)
New Revision: 9712
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
added info about source of topolgy update
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration>
connectorPair, boolean last, int distance);
+ void nodeUP(String nodeID, String sourceNodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance);
void nodeDown(String nodeID);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -1075,7 +1075,7 @@
if (serverLocator.isClusterConnection())
{
TransportConfiguration config =
serverLocator.getClusterTransportConfiguration();
- channel0.send(new NodeAnnounceMessage(serverLocator.getNodeID(),
serverLocator.isBackup(), config));
+ channel0.send(new
NodeAnnounceMessage(serverLocator.getNodeID(),serverLocator.getNodeID(),
serverLocator.isBackup(), config));
}
}
}
@@ -1177,7 +1177,7 @@
}
else
{
- serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(),
topMessage.isLast(), topMessage.getDistance());
+ serverLocator.notifyNodeUp(topMessage.getNodeID(),
topMessage.getSourceNodeID(), topMessage.getPair(), topMessage.isLast(),
topMessage.getDistance());
}
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -1124,6 +1124,7 @@
}
public synchronized void notifyNodeUp(final String nodeID,
+ final String sourceNodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last,
final int distance)
@@ -1144,7 +1145,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
}
// Notify if waiting on getting topology
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -40,7 +40,7 @@
ClientSessionFactory connect() throws Exception;
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance);
+ void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance);
void notifyNodeDown(String nodeID);
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -71,12 +71,12 @@
return (member != null);
}
- public synchronized void fireListeners(ClusterTopologyListener listener)
+ public synchronized void fireListeners(ClusterTopologyListener listener, String
sourceNodeId)
{
int count = 0;
for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
{
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count ==
topology.size(), entry.getValue().getDistance());
+ listener.nodeUP(entry.getKey(), sourceNodeId, entry.getValue().getConnector(),
++count == topology.size(), entry.getValue().getDistance());
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -111,9 +111,9 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last,
int distance)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last, distance + 1));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, sourceNodeID,
connectorPair, last, distance + 1));
}
public void nodeDown(String nodeID)
@@ -147,7 +147,7 @@
{
pair = new Pair<TransportConfiguration,
TransportConfiguration>(msg.getConnector(), null);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(),
msg.getSourceNodeID(), pair, false, 1);
}
}
});
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -34,6 +34,8 @@
private boolean exit;
private String nodeID;
+
+ private String sourceNodeID;
private Pair<TransportConfiguration, TransportConfiguration> pair;
@@ -45,11 +47,13 @@
// Constructors --------------------------------------------------
- public ClusterTopologyChangeMessage(final String nodeID, final
Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int
distance)
+ public ClusterTopologyChangeMessage(final String nodeID, String sourceNodeID, final
Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int
distance)
{
super(PacketImpl.CLUSTER_TOPOLOGY);
this.nodeID = nodeID;
+
+ this.sourceNodeID = sourceNodeID;
this.pair = pair;
@@ -80,6 +84,11 @@
{
return nodeID;
}
+
+ public String getSourceNodeID()
+ {
+ return sourceNodeID;
+ }
public Pair<TransportConfiguration, TransportConfiguration> getPair()
{
@@ -112,7 +121,8 @@
buffer.writeBoolean(exit);
buffer.writeString(nodeID);
if (!exit)
- {
+ {
+ buffer.writeString(sourceNodeID);
if (pair.a != null)
{
buffer.writeBoolean(true);
@@ -143,6 +153,7 @@
nodeID = buffer.readString();
if (!exit)
{
+ sourceNodeID = buffer.readString();
boolean hasLive = buffer.readBoolean();
TransportConfiguration a;
if(hasLive)
@@ -179,4 +190,5 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -36,15 +36,19 @@
private TransportConfiguration connector;
+ private String sourceNodeID;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public NodeAnnounceMessage(final String nodeID, final boolean backup, final
TransportConfiguration tc)
+ public NodeAnnounceMessage(final String nodeID, final String sourceNodeID, final
boolean backup, final TransportConfiguration tc)
{
super(PacketImpl.NODE_ANNOUNCE);
this.nodeID = nodeID;
+
+ this.sourceNodeID = sourceNodeID;
this.backup = backup;
@@ -64,6 +68,11 @@
return nodeID;
}
+ public String getSourceNodeID()
+ {
+ return sourceNodeID;
+ }
+
public boolean isBackup()
{
return backup;
@@ -79,6 +88,7 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
+ buffer.writeString(sourceNodeID);
buffer.writeBoolean(backup);
connector.encode(buffer);
}
@@ -87,6 +97,7 @@
public void decodeRest(final HornetQBuffer buffer)
{
this.nodeID = buffer.readString();
+ this.sourceNodeID = buffer.readString();
this.backup = buffer.readBoolean();
connector = new TransportConfiguration();
connector.decode(buffer);
@@ -100,4 +111,5 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -49,7 +49,7 @@
void notifyNodeDown(String nodeID);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup, int distance);
+ void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup, int distance);
Topology getTopology();
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -332,19 +332,23 @@
}
public synchronized void nodeUP(final String nodeID,
+ final String sourceNodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last,
final int distance)
{
- // discard notifications about ourselves
+ // discard notifications about ourselves unless its from our backup
if (nodeID.equals(nodeUUID.toString()))
{
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ if(sourceNodeID.equals(nodeUUID.toString()) && connectorPair.b != null)
+ {
+ server.getClusterManager().notifyNodeUp(nodeID, sourceNodeID, connectorPair,
last, distance);
+ }
return;
}
// we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, sourceNodeID, connectorPair, last,
distance);
// if the node is more than 1 hop away, we do not create a bridge for direct
cluster connection
if (allowsDirectConnectionsOnly && distance > 1)
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -207,7 +207,7 @@
}
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last,
int distance)
{
//todo update the topology
}
@@ -218,7 +218,7 @@
}
});
backupSessionFactory = locator.connect();
- backupSessionFactory.getConnection().getChannel(0, -1).send(new
NodeAnnounceMessage(nodeUUID.toString(), true,
configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
+ backupSessionFactory.getConnection().getChannel(0, -1).send(new
NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true,
configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
}
public synchronized void stop() throws Exception
@@ -287,6 +287,7 @@
}
public void notifyNodeUp(String nodeID,
+ String sourceNodeID,
Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
boolean last,
int distance)
@@ -298,14 +299,14 @@
}
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
}
if (distance < topology.nodes())
{
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, sourceNodeID, connectorPair, last, distance);
}
}
}
@@ -348,7 +349,7 @@
}
// We now need to send the current topology to the client
- topology.fireListeners(listener);
+ topology.fireListeners(listener, nodeUUID.toString());
}
public synchronized void removeClusterTopologyListener(final ClusterTopologyListener
listener,
@@ -439,12 +440,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false,
member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false,
member.getDistance());
}
}
}
@@ -489,12 +490,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false,
member.getDistance());
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, nodeID, member.getConnector(), false,
member.getDistance());
}
}
@@ -816,6 +817,17 @@
public void clear()
{
bridges.clear();
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ try
+ {
+ clusterConnection.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ }
+ }
clusterConnections.clear();
}
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -366,7 +366,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last,
int distance)
{
if(connectorPair.a != null &&
!liveNode.contains(connectorPair.a.getName()))
{
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -193,7 +193,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last,
int distance)
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -88,6 +88,8 @@
locator2.setReconnectAttempts(-1);
ClientSessionFactoryInternal sf2 = createSessionFactoryAndWaitForTopology(locator2,
4);
ClientSession session2 = sendAndConsume(sf2, true);
+
+
servers.get(3).crash(session2);
int liveAfter3 = waitForBackup(10000, servers, 4, 5);
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-09-22
09:07:31 UTC (rev 9711)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-09-22
09:54:22 UTC (rev 9712)
@@ -201,7 +201,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last,
int distance)
@@ -264,7 +264,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last,
int distance)
@@ -337,7 +337,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last,
int distance)
@@ -419,7 +419,7 @@
locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- public void nodeUP(String nodeID,
+ public void nodeUP(String nodeID, String sourceNodeID,
Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
boolean last,
int distance)