[hornetq-commits] JBoss hornetq SVN: r10064 - in trunk: src/main/org/hornetq/core/client/impl and 8 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Dec 21 03:05:48 EST 2010
Author: ataylor
Date: 2010-12-21 03:05:47 -0500 (Tue, 21 Dec 2010)
New Revision: 10064
Modified:
trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
trunk/src/main/org/hornetq/core/client/impl/Topology.java
trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
Log:
removed distance parameter from nodeup as not needed
Modified: trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/api/core/client/ClusterTopologyListener.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -26,7 +26,7 @@
*/
public interface ClusterTopologyListener
{
- void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
void nodeDown(String nodeID);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -1243,8 +1243,7 @@
{
serverLocator.notifyNodeUp(topMessage.getNodeID(),
topMessage.getPair(),
- topMessage.isLast(),
- topMessage.getDistance());
+ topMessage.isLast());
}
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -1139,15 +1139,14 @@
public synchronized void notifyNodeUp(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last,
- final int distance)
+ final boolean last)
{
if (!ha)
{
return;
}
- topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ topology.addMember(nodeID, new TopologyMember(connectorPair));
TopologyMember actMember = topology.getMember(nodeID);
@@ -1172,7 +1171,7 @@
for (ClusterTopologyListener listener : topologyListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, connectorPair, last);
}
// Notify if waiting on getting topology
@@ -1242,6 +1241,10 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
topologyListeners.add(listener);
+ if(topology.members() > 0)
+ {
+ System.out.println("ServerLocatorImpl.addClusterTopologyListener");
+ }
}
public void removeClusterTopologyListener(final ClusterTopologyListener listener)
Modified: trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -40,7 +40,7 @@
ClientSessionFactory connect() throws Exception;
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last);
void notifyNodeDown(String nodeID);
Modified: trunk/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/Topology.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/Topology.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -47,7 +47,10 @@
TopologyMember currentMember = topology.get(nodeId);
if (debug)
{
- //System.out.println("member.getConnector() = " + member.getConnector());
+ if(member.getConnector().toString().contains("server-id=4"))
+ {
+ System.out.println("member.getConnector() = " + member.getConnector());
+ }
}
if(currentMember == null)
{
@@ -90,7 +93,7 @@
int count = 0;
for (Map.Entry<String, TopologyMember> entry : topology.entrySet())
{
- listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size(), entry.getValue().getDistance());
+ listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == topology.size());
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/client/impl/TopologyMember.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -27,27 +27,19 @@
private final Pair<TransportConfiguration, TransportConfiguration> connector;
- private final int distance;
-
- public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector, int distance)
+ public TopologyMember(Pair<TransportConfiguration, TransportConfiguration> connector)
{
this.connector = connector;
- this.distance = distance;
}
public Pair<TransportConfiguration, TransportConfiguration> getConnector()
{
return connector;
}
-
- public int getDistance()
- {
- return distance;
- }
@Override
public String toString()
{
- return "TopologyMember[distance=" + distance + ", connector=" + connector + "]";
+ return "TopologyMember[connector=" + connector + "]";
}
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -111,9 +111,9 @@
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last, distance + 1));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
public void nodeDown(String nodeID)
@@ -147,7 +147,7 @@
{
pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, 1);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false);
}
}
});
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/ClusterTopologyChangeMessage.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -39,13 +39,11 @@
private boolean last;
- private int distance;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last, int distance)
+ public ClusterTopologyChangeMessage(final String nodeID, final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean last)
{
super(PacketImpl.CLUSTER_TOPOLOGY);
@@ -56,8 +54,6 @@
this.last = last;
this.exit = false;
-
- this.distance = distance;
}
public ClusterTopologyChangeMessage(final String nodeID)
@@ -96,16 +92,6 @@
return exit;
}
- public int getDistance()
- {
- return distance;
- }
-
- public void setDistance(int distance)
- {
- this.distance = distance;
- }
-
@Override
public void encodeRest(final HornetQBuffer buffer)
{
@@ -132,7 +118,6 @@
buffer.writeBoolean(false);
}
buffer.writeBoolean(last);
- buffer.writeInt(distance);
}
}
@@ -167,7 +152,6 @@
}
pair = new Pair<TransportConfiguration, TransportConfiguration>(a, b);
last = buffer.readBoolean();
- distance = buffer.readInt();
}
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -50,7 +50,7 @@
void notifyNodeDown(String nodeID);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup, int distance);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
Topology getTopology();
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -29,6 +29,7 @@
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -83,6 +84,8 @@
private final Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
+ private final List<TransportConfiguration> conectorssss = new ArrayList<TransportConfiguration>();
+
private final ScheduledExecutorService scheduledExecutor;
private final int maxHops;
@@ -413,8 +416,7 @@
public synchronized void nodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last,
- final int distance)
+ final boolean last)
{
// discard notifications about ourselves unless its from our backup
@@ -422,16 +424,16 @@
{
if(connectorPair.b != null)
{
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
}
return;
}
// we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
// if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowDirectConnectionsOnly && distance > 1 && !allowableConnections.contains(connectorPair.a))
+ if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
return;
}
@@ -448,6 +450,18 @@
return;
}
+ Collection<TopologyMember> topologyMembers = serverLocator.getTopology().getMembers();
+ for (TopologyMember topologyMember : topologyMembers)
+ {
+ if(topologyMember.getConnector().a != null && !conectorssss.contains(topologyMember.getConnector().a))
+ {
+ if(!topologyMember.getConnector().a.equals(connector) && !topologyMember.getConnector().a.equals(connectorPair.a))
+ {
+ System.out.println("ClusterConnectionImpl.nodeUP");
+ }
+ }
+ }
+
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -474,6 +488,7 @@
}
createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
+ conectorssss.add(connectorPair.a);
}
else
{
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -253,10 +253,9 @@
public void notifyNodeUp(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
- boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair, distance));
+ boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair));
if(!updated)
{
@@ -265,15 +264,13 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, connectorPair, last, distance);
+ listener.nodeUP(nodeID, connectorPair, last);
}
- if (distance < topology.nodes())
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- for (ClusterTopologyListener listener : clusterConnectionListeners)
- {
- listener.nodeUP(nodeID, connectorPair, last, distance);
- }
+ listener.nodeUP(nodeID, connectorPair, last);
}
}
@@ -407,12 +404,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
}
}
@@ -455,11 +452,11 @@
{
if (backup)
{
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()), 0);
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(null, cc.getConnector()));
}
else
{
- member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null), 0);
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(cc.getConnector(), null));
}
topology.addMember(nodeID, member);
@@ -480,12 +477,12 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -149,7 +149,7 @@
setupCluster();
startServers(5, 0);
-
+ servers[0].getClusterManager().getTopology().setDebug(true);
setupSessionFactory(0, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -398,7 +398,7 @@
this.latch = latch;
}
- public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last, int distance)
+ public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
if(connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -215,8 +215,7 @@
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -203,8 +203,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if(!nodes.contains(nodeID))
{
@@ -264,8 +263,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (!nodes.contains(nodeID))
{
@@ -337,8 +335,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (!nodes.contains(nodeID))
{
@@ -420,8 +417,7 @@
{
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (!nodes.contains(nodeID))
{
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-12-21 01:50:48 UTC (rev 10063)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-12-21 08:05:47 UTC (rev 10064)
@@ -298,8 +298,7 @@
public void nodeUP(String nodeID,
Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- boolean last,
- int distance)
+ boolean last)
{
if (connectorPair.a != null && !liveNode.contains(connectorPair.a.getName()))
{
More information about the hornetq-commits
mailing list