Author: ataylor
Date: 2011-02-28 14:19:53 -0500 (Mon, 28 Feb 2011)
New Revision: 10268
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fixing unit tests
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-02-28
18:48:55 UTC (rev 10267)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-02-28
19:19:53 UTC (rev 10268)
@@ -147,7 +147,7 @@
{
pair = new Pair<TransportConfiguration,
TransportConfiguration>(msg.getConnector(), null);
}
- server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false);
+ server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false,
true);
}
}
});
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-02-28
18:48:55 UTC (rev 10267)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-02-28
19:19:53 UTC (rev 10268)
@@ -15,6 +15,7 @@
import java.util.Map;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -54,4 +55,6 @@
// for debug
String description();
+
+ void nodeAnnounced(String nodeID,
Pair<TransportConfiguration,TransportConfiguration> connectorPair);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-02-28
18:48:55 UTC (rev 10267)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-02-28
19:19:53 UTC (rev 10268)
@@ -50,7 +50,7 @@
void notifyNodeDown(String nodeID);
- void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup);
+ void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup, boolean nodeAnnounce);
Topology getTopology();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-02-28
18:48:55 UTC (rev 10267)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-02-28
19:19:53 UTC (rev 10268)
@@ -35,7 +35,6 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
-import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -84,8 +83,6 @@
private final Map<String, MessageFlowRecord> records = new HashMap<String,
MessageFlowRecord>();
- private final List<TransportConfiguration> conectorssss = new
ArrayList<TransportConfiguration>();
-
private final ScheduledExecutorService scheduledExecutor;
private final int maxHops;
@@ -425,13 +422,13 @@
{
if(connectorPair.b != null)
{
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
}
return;
}
// we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last);
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
// if the node is more than 1 hop away, we do not create a bridge for direct
cluster connection
if (allowDirectConnectionsOnly &&
!allowableConnections.contains(connectorPair.a))
@@ -451,59 +448,120 @@
return;
}
- Collection<TopologyMember> topologyMembers =
serverLocator.getTopology().getMembers();
- for (TopologyMember topologyMember : topologyMembers)
+ synchronized (records)
{
- if(topologyMember.getConnector().a != null &&
!conectorssss.contains(topologyMember.getConnector().a))
+ try
{
- if(!topologyMember.getConnector().a.equals(connector) &&
!topologyMember.getConnector().a.equals(connectorPair.a))
+ MessageFlowRecord record = records.get(nodeID);
+
+ if (record == null)
{
- log.debug("ClusterConnectionImpl.nodeUP");
+ // New node - create a new flow record
+
+ final SimpleString queueName = new SimpleString("sf." + name +
"." + nodeID);
+
+ Binding queueBinding = postOffice.getBinding(queueName);
+
+ Queue queue;
+
+ if (queueBinding != null)
+ {
+ queue = (Queue)queueBinding.getBindable();
+ }
+ else
+ {
+ // Add binding in storage so the queue will get reloaded on startup and
we can find it - it's never
+ // actually routed to at that address though
+ queue = server.createQueue(queueName, queueName, null, true, false);
+ }
+
+ createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
+ else
+ {
+ // FIXME apple and orange comparison. I don't understand it...
+ //if
(!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+ // {
+ // // New live node - close it and recreate it - TODO - CAN THIS EVER
HAPPEN?
+ //}
+ }
}
+ catch (Exception e)
+ {
+ log.error("Failed to update topology", e);
+ }
}
+ }
- try
+ public void nodeAnnounced(final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair)
+ {
+ if (nodeID.equals(nodeUUID.toString()))
{
- MessageFlowRecord record = records.get(nodeID);
+ return;
+ }
- if (record == null)
+ // if the node is more than 1 hop away, we do not create a bridge for direct
cluster connection
+ if (allowDirectConnectionsOnly &&
!allowableConnections.contains(connectorPair.a))
+ {
+ return;
+ }
+
+ // FIXME required to prevent cluster connections w/o discovery group
+ // and empty static connectors to create bridges... ulgy!
+ if (serverLocator == null)
+ {
+ return;
+ }
+ /*we dont create bridges to backups*/
+ if(connectorPair.a == null)
+ {
+ return;
+ }
+
+ synchronized (records)
+ {
+ try
{
- // New node - create a new flow record
+ MessageFlowRecord record = records.get(nodeID);
- final SimpleString queueName = new SimpleString("sf." + name +
"." + nodeID);
+ if (record == null)
+ {
+ // New node - create a new flow record
- Binding queueBinding = postOffice.getBinding(queueName);
+ final SimpleString queueName = new SimpleString("sf." + name +
"." + nodeID);
- Queue queue;
+ Binding queueBinding = postOffice.getBinding(queueName);
- if (queueBinding != null)
- {
- queue = (Queue)queueBinding.getBindable();
+ Queue queue;
+
+ if (queueBinding != null)
+ {
+ queue = (Queue)queueBinding.getBindable();
+ }
+ else
+ {
+ // Add binding in storage so the queue will get reloaded on startup and
we can find it - it's never
+ // actually routed to at that address though
+ queue = server.createQueue(queueName, queueName, null, true, false);
+ }
+
+ createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
else
{
- // Add binding in storage so the queue will get reloaded on startup and we
can find it - it's never
- // actually routed to at that address though
- queue = server.createQueue(queueName, queueName, null, true, false);
+ // FIXME apple and orange comparison. I don't understand it...
+ //if
(!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
+ // {
+ // // New live node - close it and recreate it - TODO - CAN THIS EVER
HAPPEN?
+ //}
}
-
- createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
- conectorssss.add(connectorPair.a);
}
- else
+ catch (Exception e)
{
- // FIXME apple and orange comparison. I don't understand it...
- //if
(!connectorPair.a.equals(record.getBridge().getForwardingConnection().getTransportConnection()))
- // {
- // // New live node - close it and recreate it - TODO - CAN THIS EVER
HAPPEN?
- //}
+ log.error("Failed to update topology", e);
}
}
- catch (Exception e)
- {
- log.error("Failed to update topology", e);
- }
}
private void createNewRecord(final String nodeID,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-02-28
18:48:55 UTC (rev 10267)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-02-28
19:19:53 UTC (rev 10268)
@@ -18,6 +18,7 @@
import java.lang.reflect.Array;
import java.net.InetAddress;
import java.util.*;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -102,6 +103,7 @@
private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new
ArrayList<ServerLocatorInternal>();
+ private Executor executor;
public ClusterManagerImpl(final ExecutorFactory executorFactory,
final HornetQServer server,
@@ -120,6 +122,8 @@
this.executorFactory = executorFactory;
+ executor = executorFactory.getExecutor();
+
this.server = server;
this.postOffice = postOffice;
@@ -250,11 +254,13 @@
}
}
- public void notifyNodeUp(String nodeID,
- Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- boolean last)
+ public void notifyNodeUp(final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean last,
+ final boolean nodeAnnounce)
{
- boolean updated = topology.addMember(nodeID, new TopologyMember(connectorPair));
+ TopologyMember member = new TopologyMember(connectorPair);
+ boolean updated = topology.addMember(nodeID, member);
if(!updated)
{
@@ -263,14 +269,23 @@
for (ClusterTopologyListener listener : clientListeners)
{
- listener.nodeUP(nodeID, connectorPair, last);
+ listener.nodeUP(nodeID, member.getConnector(), last);
}
for (ClusterTopologyListener listener : clusterConnectionListeners)
{
- listener.nodeUP(nodeID, connectorPair, last);
+ listener.nodeUP(nodeID, member.getConnector(), last);
}
+
+ //if this is a node being announced we are hearing it direct from the nodes CM so
need to inform our cluster connections.
+ if (nodeAnnounce)
+ {
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ clusterConnection.nodeAnnounced(nodeID, connectorPair);
+ }
+ }
}
public boolean isStarted()
@@ -836,7 +851,7 @@
return;
}
log.info("announcing backup");
- this.executorFactory.getExecutor().execute(new Runnable()
+ executor.execute(new Runnable()
{
public void run()
{