Author: clebert.suconic(a)jboss.com
Date: 2011-07-28 13:01:30 -0400 (Thu, 28 Jul 2011)
New Revision: 11064
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
test fixes
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28
03:47:39 UTC (rev 11063)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28
17:01:30 UTC (rev 11064)
@@ -78,7 +78,7 @@
private StaticConnector staticConnector = new StaticConnector();
- private final Topology topology = new Topology(this);
+ private final Topology topology;
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -358,11 +358,14 @@
}
}
- private ServerLocatorImpl(final boolean useHA,
+ private ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
final DiscoveryGroupConfiguration
discoveryGroupConfiguration,
final TransportConfiguration[] transportConfigs)
{
e.fillInStackTrace();
+
+ this.topology = topology;
this.ha = useHA;
this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -440,7 +443,7 @@
*/
public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration
groupConfiguration)
{
- this(useHA, groupConfiguration, null);
+ this(new Topology(null), useHA, groupConfiguration, null);
}
/**
@@ -450,9 +453,30 @@
*/
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration...
transportConfigs)
{
- this(useHA, null, transportConfigs);
+ this(new Topology(null), useHA, null, transportConfigs);
}
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public ServerLocatorImpl(final Topology topology, final boolean useHA, final
DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(topology, useHA, groupConfiguration, null);
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public ServerLocatorImpl(final Topology topology, final boolean useHA, final
TransportConfiguration... transportConfigs)
+ {
+ this(topology, useHA, null, transportConfigs);
+ }
+
private TransportConfiguration selectConnector()
{
if (receivedTopology)
@@ -1187,7 +1211,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX YYY nodeDown " + this + " nodeID=" + nodeID +
" as being down", new Exception("trace"));
+ log.debug("XXX ZZZ nodeDown " + this + " nodeID=" + nodeID +
" as being down", new Exception("trace"));
}
removed = topology.removeMember(nodeID);
@@ -1236,7 +1260,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX YYY NodeUp " + this + "::nodeID=" + nodeID +
", connectorPair=" + connectorPair);
+ log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID +
", connectorPair=" + connectorPair, new Exception ("trace"));
}
topology.addMember(nodeID, new TopologyMember(connectorPair));
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-28
03:47:39 UTC (rev 11063)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-28
17:01:30 UTC (rev 11064)
@@ -13,6 +13,7 @@
package org.hornetq.core.client.impl;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -52,6 +53,7 @@
public Topology(final Object owner)
{
this.owner = owner;
+ log.debug("ZZZ III Topology@" +
Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception
("trace")); // Delete this line
}
/*
@@ -75,8 +77,12 @@
}
if(currentMember == null)
{
+ replaced = true;
+ if (log.isDebugEnabled())
+ {
+ log.debug("ZZZ " + this + " MEMBER WAS NULL, Add member
nodeId=" + nodeId + " member = " + member + " replaced = " +
replaced + " size = " + topology.size(), new Exception ("trace"));
+ }
topology.put(nodeId, member);
- replaced = true;
}
else
{
@@ -103,8 +109,14 @@
if(debug)
{
log.debug(this + "::Topology updated=" + replaced);
- log.debug(describe("After:"));
+ log.debug(describe(this + "::After:"));
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("ZZZ " + this + " Add member nodeId=" + nodeId +
" member = " + member + " replaced = " + replaced + " size =
" + topology.size(), new Exception ("trace"));
+ }
+
return replaced;
}
@@ -113,12 +125,12 @@
TopologyMember member = topology.remove(nodeId);
if (log.isDebugEnabled())
{
- log.debug("XXX " + this + " removing nodeID=" + nodeId +
", result=" + member, new Exception ("trace"));
+ log.debug("ZZZ " + this + " removing nodeID=" + nodeId +
", result=" + member + ", size = " + topology.size(), new Exception
("trace"));
}
return (member != null);
}
- public synchronized void sendTopology(ClusterTopologyListener listener)
+ public void sendTopology(ClusterTopologyListener listener)
{
int count = 0;
Map<String, TopologyMember> copy;
@@ -144,14 +156,21 @@
public Collection<TopologyMember> getMembers()
{
- return topology.values();
+ ArrayList<TopologyMember> members;
+ synchronized (this)
+ {
+ members = new ArrayList<TopologyMember>(topology.values());
+ }
+ return members;
}
- public int nodes()
+ public synchronized int nodes()
{
int count = 0;
for (TopologyMember member : topology.values())
{
+
+ // ARRUMAR ISSO
if (member.getConnector().a != null)
{
count++;
@@ -182,6 +201,10 @@
public void clear()
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("ZZZ " + this + "::clear", new Exception
("trace"));
+ }
topology.clear();
}
@@ -224,7 +247,7 @@
}
else
{
- return "Topology [owner=" + owner + "]";
+ return "Topology@" +
Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner +
"]";
}
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-28
03:47:39 UTC (rev 11063)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-28
17:01:30 UTC (rev 11064)
@@ -118,12 +118,12 @@
{
public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
+ channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
}
public void nodeDown(String nodeID)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID));
}
};
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28
03:47:39 UTC (rev 11063)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28
17:01:30 UTC (rev 11064)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -34,7 +35,9 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -70,7 +73,11 @@
private static final boolean isTrace = log.isTraceEnabled();
- private final org.hornetq.utils.ExecutorFactory executorFactory;
+ private final ExecutorFactory executorFactory;
+
+ private final Topology clusterManagerTopology;
+
+ private final Executor executor;
private final HornetQServer server;
@@ -127,6 +134,7 @@
private final ClusterManagerImpl manager;
public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ final Topology clusterManagerTopology,
final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
@@ -183,6 +191,8 @@
this.routeWhenNoConsumers = routeWhenNoConsumers;
this.executorFactory = executorFactory;
+
+ this.executor = executorFactory.getExecutor();
this.server = server;
@@ -203,6 +213,8 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
this.manager = manager;
+
+ this.clusterManagerTopology = clusterManagerTopology;
clusterConnector = new StaticClusterConnector(tcConfigs);
@@ -219,6 +231,7 @@
}
public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ final Topology clusterManagerTopology,
DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
@@ -275,6 +288,8 @@
this.routeWhenNoConsumers = routeWhenNoConsumers;
this.executorFactory = executorFactory;
+
+ this.executor = executorFactory.getExecutor();
this.server = server;
@@ -297,6 +312,8 @@
clusterConnector = new DiscoveryClusterConnector(dg);
this.manager = manager;
+
+ this.clusterManagerTopology = clusterManagerTopology;
}
public synchronized void start() throws Exception
@@ -352,12 +369,18 @@
props);
managementService.sendNotification(notification);
}
+
+ executor.execute(new Runnable(){
+ public void run()
+ {
+ if(serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
- if(serverLocator != null)
- {
- serverLocator.close();
- serverLocator = null;
- }
+ }
+ });
started = false;
}
@@ -1258,7 +1281,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for
" + Arrays.toString(tcConfigs));
}
- return (ServerLocatorInternal)
HornetQClient.createServerLocatorWithHA(tcConfigs);
+ return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
}
else
{
@@ -1289,7 +1312,7 @@
public ServerLocatorInternal createServerLocator()
{
- return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+ return new ServerLocatorImpl(clusterManagerTopology, true, dg);
}
}
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28
03:47:39 UTC (rev 11063)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28
17:01:30 UTC (rev 11064)
@@ -245,13 +245,17 @@
backupServerLocator = null;
}
- for (ServerLocator clusterLocator : clusterLocators)
+ executor.execute(new Runnable()
{
- log.info("WWW Closing clusterLocator " + clusterLocator);
- clusterLocator.close();
- log.info("WWW Closed clusterLocator " + clusterLocator);
- }
- clusterLocators.clear();
+ public void run()
+ {
+ for (ServerLocator clusterLocator : clusterLocators)
+ {
+ clusterLocator.close();
+ }
+ clusterLocators.clear();
+ }
+ });
started = false;
topologyListeners.clear();
@@ -829,6 +833,7 @@
}
clusterConnection = new ClusterConnectionImpl(this,
+ topology,
dg,
connector,
new
SimpleString(config.getName()),
@@ -865,6 +870,7 @@
}
clusterConnection = new ClusterConnectionImpl(this,
+ topology,
tcConfigs,
connector,
new
SimpleString(config.getName()),