JBoss hornetq SVN: r11066 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/protocol/core/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-29 00:22:24 -0400 (Fri, 29 Jul 2011)
New Revision: 11066
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
test fixes
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -68,8 +68,6 @@
private String identity;
- private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
-
private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
private TransportConfiguration[] initialConnectors;
@@ -444,6 +442,7 @@
public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
{
this(new Topology(null), useHA, groupConfiguration, null);
+ topology.setOwner(this);
}
/**
@@ -454,6 +453,7 @@
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
{
this(new Topology(null), useHA, null, transportConfigs);
+ topology.setOwner(this);
}
/**
@@ -1215,7 +1215,7 @@
}
removed = topology.removeMember(nodeID);
-
+
if (!topology.isEmpty())
{
updateArraysAndPairs();
@@ -1232,13 +1232,6 @@
receivedTopology = false;
}
- if (removed)
- {
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
}
public synchronized void notifyNodeUp(final String nodeID,
@@ -1263,7 +1256,7 @@
log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception ("trace"));
}
- topology.addMember(nodeID, new TopologyMember(connectorPair));
+ topology.addMember(nodeID, new TopologyMember(connectorPair), last);
TopologyMember actMember = topology.getMember(nodeID);
@@ -1286,11 +1279,6 @@
receivedTopology = true;
}
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, connectorPair, last);
- }
-
// Notify if waiting on getting topology
notify();
}
@@ -1371,16 +1359,12 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
- topologyListeners.add(listener);
- if (topology.members() > 0)
- {
- log.debug(this + "::ServerLocatorImpl.addClusterTopologyListener");
- }
+ topology.addClusterTopologyListener(listener);
}
public void removeClusterTopologyListener(final ClusterTopologyListener listener)
{
- topologyListeners.remove(listener);
+ topology.removeClusterTopologyListener(listener);
}
public synchronized void addFactory(ClientSessionFactoryInternal factory)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -15,10 +15,14 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
@@ -30,16 +34,13 @@
*/
public class Topology implements Serializable
{
-
- /**
- *
- */
+
private static final long serialVersionUID = -9037171688692471371L;
-
+ private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
-
+
/** Used to debug operations.
*
* Someone may argue this is not needed. But it's impossible to debg anything related to topology without knowing what node
@@ -47,13 +48,15 @@
*
* Hence I added some information to locate debugging here.
* */
- private final Object owner;
-
-
+ private volatile Object owner;
+
+ private volatile Executor executor;
+
public Topology(final Object owner)
{
this.owner = owner;
- log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception ("trace")); // Delete this line
+ Topology.log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+ new Exception("trace")); // Delete this line
}
/*
@@ -62,89 +65,224 @@
* keys are node IDs
* values are a pair of live/backup transport configurations
*/
- private Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
+ private final Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
- private boolean debug = log.isDebugEnabled();
+ public void setExecutor(Executor executor)
+ {
+ this.executor = executor;
+ }
- public synchronized boolean addMember(String nodeId, TopologyMember member)
+ public void addClusterTopologyListener(final ClusterTopologyListener listener)
{
- boolean replaced = false;
- TopologyMember currentMember = topology.get(nodeId);
- if (debug)
+ if (log.isDebugEnabled())
{
- log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception ("trace"));
- log.debug(describe("Before:"));
+ log.debug(this + "::PPP Adding topology listener " + listener, new Exception("Trace"));
}
- if(currentMember == null)
+ synchronized (topologyListeners)
{
- replaced = true;
- if (log.isDebugEnabled())
- {
- log.debug("ZZZ " + this + " MEMBER WAS NULL, Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
- }
- topology.put(nodeId, member);
+ topologyListeners.add(listener);
}
- else
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener)
+ {
+ if (log.isDebugEnabled())
{
- if(hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
+ log.debug(this + "::PPP Removing topology listener " + listener, new Exception("Trace"));
+ }
+ synchronized (topologyListeners)
+ {
+ topologyListeners.remove(listener);
+ }
+ }
+
+ public boolean addMember(final String nodeId, final TopologyMember member, final boolean last)
+ {
+ boolean replaced = false;
+
+ synchronized (this)
+ {
+ TopologyMember currentMember = topology.get(nodeId);
+
+ if (Topology.log.isDebugEnabled())
{
- currentMember.getConnector().a = member.getConnector().a;
- replaced = true;
+ Topology.log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception("trace"));
+ Topology.log.debug(describe("Before:"));
}
- if(hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
+
+ if (currentMember == null)
{
- currentMember.getConnector().b = member.getConnector().b;
replaced = true;
+ if (Topology.log.isDebugEnabled())
+ {
+ Topology.log.debug("ZZZ " + this +
+ " MEMBER WAS NULL, Add member nodeId=" +
+ nodeId +
+ " member = " +
+ member +
+ " replaced = " +
+ replaced +
+ " size = " +
+ topology.size(), new Exception("trace"));
+ }
+ topology.put(nodeId, member);
}
+ else
+ {
+ if (hasChanged(currentMember.getConnector().a, member.getConnector().a) && member.getConnector().a != null)
+ {
+ currentMember.getConnector().a = member.getConnector().a;
+ replaced = true;
+ }
+ if (hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
+ {
+ currentMember.getConnector().b = member.getConnector().b;
+ replaced = true;
+ }
- if(member.getConnector().a == null)
+ if (member.getConnector().a == null)
+ {
+ member.getConnector().a = currentMember.getConnector().a;
+ }
+ if (member.getConnector().b == null)
+ {
+ member.getConnector().b = currentMember.getConnector().b;
+ }
+ }
+
+ if (Topology.log.isDebugEnabled())
{
- member.getConnector().a = currentMember.getConnector().a;
+ Topology.log.debug(this + "::Topology updated=" + replaced);
+ Topology.log.debug(describe(this + "::After:"));
}
- if(member.getConnector().b == null)
+
+ if (Topology.log.isDebugEnabled())
{
- member.getConnector().b = currentMember.getConnector().b;
+ Topology.log.debug("ZZZ " + this +
+ " Add member nodeId=" +
+ nodeId +
+ " member = " +
+ member +
+ " replaced = " +
+ replaced +
+ " size = " +
+ topology.size(), new Exception("trace"));
}
+
}
- if(debug)
+
+ if (replaced)
{
- log.debug(this + "::Topology updated=" + replaced);
- log.debug(describe(this + "::After:"));
+ ArrayList<ClusterTopologyListener> copy = copyListeners();
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node up = " + nodeId);
+ }
+
+ listener.nodeUP(nodeId, member.getConnector(), last);
+ }
}
+
+ return replaced;
+ }
+
+ /**
+ * @return
+ */
+ private ArrayList<ClusterTopologyListener> copyListeners()
+ {
+ ArrayList <ClusterTopologyListener> listenersCopy;
+ synchronized (topologyListeners)
+ {
+ listenersCopy = new ArrayList<ClusterTopologyListener>(topologyListeners);
+ }
+ return listenersCopy;
+ }
+
+ public boolean removeMember(final String nodeId)
+ {
+ TopologyMember member;
- if (log.isDebugEnabled())
+ synchronized (this)
{
- log.debug("ZZZ " + this + " Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
+ member = topology.remove(nodeId);
}
-
- return replaced;
+
+ if (member != null)
+ {
+ ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+ for (ClusterTopologyListener listener : copy)
+ {
+ if (Topology.log.isTraceEnabled())
+ {
+ Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node down = " + nodeId);
+ }
+ listener.nodeDown(nodeId);
+ }
+ }
+
+ if (Topology.log.isDebugEnabled())
+ {
+ Topology.log.debug("ZZZ " + this +
+ " removing nodeID=" +
+ nodeId +
+ ", result=" +
+ member +
+ ", size = " +
+ topology.size(), new Exception("trace"));
+ }
+
+ return member != null;
}
- public synchronized boolean removeMember(String nodeId)
+ /**
+ * it will send all the member updates to listeners, independently of being changed or not
+ * @param nodeID
+ * @param member
+ */
+ public void sendMemberToListeners(String nodeID, TopologyMember member)
{
- TopologyMember member = topology.remove(nodeId);
- if (log.isDebugEnabled())
+ // To make sure it was updated
+ addMember(nodeID, member, false);
+
+ ArrayList<ClusterTopologyListener> copy = copyListeners();
+
+ // Now force sending it
+ for (ClusterTopologyListener listener : copy)
{
- log.debug("ZZZ " + this + " removing nodeID=" + nodeId + ", result=" + member + ", size = " + topology.size(), new Exception ("trace"));
+ if (log.isDebugEnabled())
+ {
+ log.debug("Informing client listener " + listener +
+ " about itself node " +
+ nodeID +
+ " with connector=" +
+ member.getConnector());
+ }
+ listener.nodeUP(nodeID, member.getConnector(), false);
}
- return (member != null);
}
- public void sendTopology(ClusterTopologyListener listener)
+ public synchronized void sendTopology(final ClusterTopologyListener listener)
{
int count = 0;
+
Map<String, TopologyMember> copy;
+
synchronized (this)
{
copy = new HashMap<String, TopologyMember>(topology);
}
+
for (Map.Entry<String, TopologyMember> entry : copy.entrySet())
{
listener.nodeUP(entry.getKey(), entry.getValue().getConnector(), ++count == copy.size());
}
}
- public TopologyMember getMember(String nodeID)
+ public TopologyMember getMember(final String nodeID)
{
return topology.get(nodeID);
}
@@ -169,8 +307,6 @@
int count = 0;
for (TopologyMember member : topology.values())
{
-
- // ARRUMAR ISSO
if (member.getConnector().a != null)
{
count++;
@@ -182,12 +318,13 @@
}
return count;
}
+
public synchronized String describe()
{
return describe("");
}
- public synchronized String describe(String text)
+ public synchronized String describe(final String text)
{
String desc = text + "\n";
@@ -201,11 +338,11 @@
public void clear()
{
- if (log.isDebugEnabled())
+ if (Topology.log.isDebugEnabled())
{
- log.debug("ZZZ " + this + "::clear", new Exception ("trace"));
+ Topology.log.debug("ZZZ III " + this + "::clear", new Exception("trace"));
}
- topology.clear();
+ // topology.clear();
}
public int members()
@@ -213,28 +350,30 @@
return topology.size();
}
- private boolean hasChanged(TransportConfiguration currentConnector, TransportConfiguration connector)
+ public void setOwner(final Object owner)
{
- return (currentConnector == null && connector != null) || (currentConnector != null && !currentConnector.equals(connector));
+ this.owner = owner;
}
- public TransportConfiguration getBackupForConnector(TransportConfiguration connectorConfiguration)
+ private boolean hasChanged(final TransportConfiguration currentConnector, final TransportConfiguration connector)
{
+ return currentConnector == null && connector != null ||
+ currentConnector != null &&
+ !currentConnector.equals(connector);
+ }
+
+ public TransportConfiguration getBackupForConnector(final TransportConfiguration connectorConfiguration)
+ {
for (TopologyMember member : topology.values())
{
- if(member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
+ if (member.getConnector().a != null && member.getConnector().a.equals(connectorConfiguration))
{
- return member.getConnector().b;
+ return member.getConnector().b;
}
}
return null;
}
- public void setDebug(boolean b)
- {
- debug = b;
- }
-
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@@ -250,5 +389,5 @@
return "Topology@" + Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner + "]";
}
}
-
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -118,13 +118,18 @@
{
public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
- channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
public void nodeDown(String nodeID)
{
- channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID));
+ channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
+
+ public String toString()
+ {
+ return "Remote Proxy on channel " + Integer.toHexString(System.identityHashCode(this));
+ }
};
final boolean isCC = msg.isClusterConnection();
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@@ -105,7 +106,7 @@
private final boolean routeWhenNoConsumers;
- private final Map<String, MessageFlowRecord> records = new HashMap<String, MessageFlowRecord>();
+ private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<String, MessageFlowRecord>();
private final ScheduledExecutorService scheduledExecutor;
@@ -498,7 +499,7 @@
// ClusterTopologyListener implementation ------------------------------------------------------------------
- public synchronized void nodeDown(final String nodeID)
+ public void nodeDown(final String nodeID)
{
if (log.isDebugEnabled())
{
@@ -533,7 +534,7 @@
}
- public synchronized void nodeUP(final String nodeID,
+ public void nodeUP(final String nodeID,
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -100,8 +100,6 @@
// the cluster connections which links this node to other cluster nodes
private final Map<String, ClusterConnection> clusterConnections = new HashMap<String, ClusterConnection>();
- private final Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
-
private final Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
@@ -258,10 +256,7 @@
});
started = false;
- topologyListeners.clear();
clusterConnections.clear();
- topology.clear();
-
}
public void notifyNodeDown(String nodeID)
@@ -273,16 +268,8 @@
log.debug("XXX " + this + "::removing nodeID=" + nodeID, new Exception ("trace"));
- boolean removed = topology.removeMember(nodeID);
+ topology.removeMember(nodeID);
- if (removed)
- {
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeDown(nodeID);
- }
- }
}
public void notifyNodeUp(final String nodeID,
@@ -296,7 +283,7 @@
}
TopologyMember member = new TopologyMember(connectorPair);
- boolean updated = topology.addMember(nodeID, member);
+ boolean updated = topology.addMember(nodeID, member, last);
if (!updated)
{
@@ -307,11 +294,6 @@
return;
}
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, member.getConnector(), last);
- }
-
if (log.isDebugEnabled())
{
log.debug("XXX " + this + " received notifyNodeUp nodeID=" + nodeID + " connectorPair=" + connectorPair +
@@ -365,7 +347,7 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
{
- topologyListeners.add(listener);
+ topology.addClusterTopologyListener(listener);
// We now need to send the current topology to the client
executor.execute(new Runnable(){
@@ -380,7 +362,7 @@
public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
{
- topologyListeners.add(listener);
+ topology.removeClusterTopologyListener(listener);
}
public Topology getTopology()
@@ -398,14 +380,10 @@
String nodeID = server.getNodeID().toString();
TopologyMember member = topology.getMember(nodeID);
- // we swap the topology backup now = live
- if (member != null)
- {
- member.getConnector().a = member.getConnector().b;
+ //swap backup as live and send it to everybody
+ member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b, null));
+ topology.addMember(nodeID, member, false);
- member.getConnector().b = null;
- }
-
if (backupServerLocator != null)
{
// todo we could use the topology of this to preempt it arriving from the cc
@@ -456,15 +434,8 @@
log.warn("unable to start bridge " + bridge.getName(), e);
}
}
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Informing client listener " + listener + " about itself node " + nodeID + " with connector=" + member.getConnector());
- }
- listener.nodeUP(nodeID, member.getConnector(), false);
- }
+
+ topology.sendMemberToListeners(nodeID, member);
}
}
@@ -519,7 +490,7 @@
null));
}
- topology.addMember(nodeID, member);
+ topology.addMember(nodeID, member, false);
}
else
{
@@ -532,13 +503,6 @@
// pair.a = cc.getConnector();
}
}
-
- // Propagate the announcement
-
- for (ClusterTopologyListener listener : topologyListeners)
- {
- listener.nodeUP(nodeID, member.getConnector(), false);
- }
}
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -456,7 +456,7 @@
}
catch (Exception e)
{
- log.info("unable to restart server, please kill and restart manually", e);
+ log.warn("unable to restart server, please kill and restart manually", e);
}
}
});
@@ -465,6 +465,7 @@
}
catch (Exception e)
{
+ log.debug(e.getMessage(), e);
//hopefully it will work next call
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -265,14 +265,16 @@
{
if (nodes == topology.getMembers().size())
{
- return;
+
+ log.info("ZZZ III look up for topology on " + topology + " size = " + topology.getMembers().size());
+ return;
}
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < timeout);
- String msg = "Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") nodes on server = " + server + ")\n Current topology:" + topology.describe();
+ String msg = "ZZZ Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") topology = " + topology + ")\n Current topology:" + topology.describe();
ClusterTestBase.log.error(msg);
@@ -1988,6 +1990,7 @@
{
for (int node : nodes)
{
+ log.info("#test start node " + node);
servers[node].setIdentity("server " + node);
ClusterTestBase.log.info("starting server " + servers[node]);
servers[node].start();
@@ -1997,20 +2000,14 @@
ClusterTestBase.log.info("started server " + node);
waitForServer(servers[node]);
-
- for (int i = 0 ; i <= node; i++)
- {
- try
- {
- log.info("Describing Server " + servers[i]);
- log.info(servers[i].describe());
- }
- catch (Throwable ignored)
- {
-
- }
- }
}
+
+ for (int node: nodes)
+ {
+ System.out.println(servers[node].describe());
+ }
+
+
}
protected void waitForServer(HornetQServer server)
@@ -2050,6 +2047,7 @@
log.info("Stopping nodes " + Arrays.toString(nodes));
for (int node : nodes)
{
+ log.info("#test stop server " + node);
if (servers[node] != null && servers[node].isStarted())
{
try
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -149,7 +149,6 @@
setupCluster();
startServers(5, 0);
- servers[0].getClusterManager().getTopology().setDebug(true);
setupSessionFactory(0, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -128,21 +128,12 @@
stopServers(0, 1);
}
- public void _testLoop() throws Exception
- {
- for (int i = 0; i < 1000; i++)
- {
- log.info("#test " + i);
- testStopStart();
- tearDown();
- setUp();
- }
- }
-
public void testRestartTest() throws Throwable
{
startServers(0, 1);
waitForTopology(servers[0], 2);
+
+ log.info("ZZZ Server 0 " + servers[0].describe());
// try
// {
@@ -160,15 +151,30 @@
for (int i = 0; i < 100; i++)
{
log.info("#stop #test #" + i);
+ Thread.sleep(500);
stopServers(1);
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
+ Thread.sleep(500);
startServers(1);
+ Thread.sleep(500);
waitForTopology(servers[0], 2, 2000);
+ waitForTopology(servers[1], 2, 2000);
}
}
+ public void testLoop() throws Exception
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ log.info("#test " + i);
+ testStopStart();
+ tearDown();
+ setUp();
+ }
+ }
+
public void testStopStart() throws Exception
{
startServers(0, 1);
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-07-28 17:44:46 UTC (rev 11065)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-07-29 04:22:24 UTC (rev 11066)
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
@@ -26,6 +27,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.ClusterConnection;
@@ -194,6 +196,8 @@
startServers(0);
ServerLocator locator = createHAServerLocator();
+
+ ((ServerLocatorImpl)locator).getTopology().setOwner("ZZZ III testReceive");
final List<String> nodes = new ArrayList<String>();
final CountDownLatch upLatch = new CountDownLatch(5);
@@ -207,18 +211,32 @@
{
if(!nodes.contains(nodeID))
{
+ System.out.println("Node UP " + nodeID + " added");
+ log.info("ZZZ III Node UP " + nodeID + " added");
nodes.add(nodeID);
upLatch.countDown();
}
+ else
+ {
+ System.out.println("Node UP " + nodeID + " was already here");
+ log.info("ZZZ III Node UP " + nodeID + " was already here");
+ }
}
public void nodeDown(String nodeID)
{
if (nodes.contains(nodeID))
{
+ log.info("ZZZ III Node down " + nodeID + " accepted");
+ System.out.println("Node down " + nodeID + " accepted");
nodes.remove(nodeID);
downLatch.countDown();
}
+ else
+ {
+ log.info("ZZZ III Node down " + nodeID + " already removed");
+ System.out.println("Node down " + nodeID + " already removed");
+ }
}
});
12 years, 9 months
JBoss hornetq SVN: r11065 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-28 13:44:46 -0400 (Thu, 28 Jul 2011)
New Revision: 11065
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
Improvement on queue
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-07-28 17:01:30 UTC (rev 11064)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-07-28 17:44:46 UTC (rev 11065)
@@ -22,11 +22,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -726,9 +728,31 @@
public long getMessageCount()
{
- blockOnExecutorFuture();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicLong count = new AtomicLong(0);
- return getInstantMessageCount();
+ getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ count.set(getInstantMessageCount());
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Timed out on waiting for MessageCount");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ return count.get();
}
public long getInstantMessageCount()
12 years, 9 months
JBoss hornetq SVN: r11064 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: protocol/core/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-28 13:01:30 -0400 (Thu, 28 Jul 2011)
New Revision: 11064
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
test fixes
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 17:01:30 UTC (rev 11064)
@@ -78,7 +78,7 @@
private StaticConnector staticConnector = new StaticConnector();
- private final Topology topology = new Topology(this);
+ private final Topology topology;
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -358,11 +358,14 @@
}
}
- private ServerLocatorImpl(final boolean useHA,
+ private ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
final DiscoveryGroupConfiguration discoveryGroupConfiguration,
final TransportConfiguration[] transportConfigs)
{
e.fillInStackTrace();
+
+ this.topology = topology;
this.ha = useHA;
this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -440,7 +443,7 @@
*/
public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
{
- this(useHA, groupConfiguration, null);
+ this(new Topology(null), useHA, groupConfiguration, null);
}
/**
@@ -450,9 +453,30 @@
*/
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
{
- this(useHA, null, transportConfigs);
+ this(new Topology(null), useHA, null, transportConfigs);
}
+ /**
+ * Create a ServerLocatorImpl using UDP discovery to lookup cluster
+ *
+ * @param discoveryAddress
+ * @param discoveryPort
+ */
+ public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ {
+ this(topology, useHA, groupConfiguration, null);
+ }
+
+ /**
+ * Create a ServerLocatorImpl using a static list of live servers
+ *
+ * @param transportConfigs
+ */
+ public ServerLocatorImpl(final Topology topology, final boolean useHA, final TransportConfiguration... transportConfigs)
+ {
+ this(topology, useHA, null, transportConfigs);
+ }
+
private TransportConfiguration selectConnector()
{
if (receivedTopology)
@@ -1187,7 +1211,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX YYY nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
+ log.debug("XXX ZZZ nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
}
removed = topology.removeMember(nodeID);
@@ -1236,7 +1260,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX YYY NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair);
+ log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception ("trace"));
}
topology.addMember(nodeID, new TopologyMember(connectorPair));
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-28 17:01:30 UTC (rev 11064)
@@ -13,6 +13,7 @@
package org.hornetq.core.client.impl;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -52,6 +53,7 @@
public Topology(final Object owner)
{
this.owner = owner;
+ log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception ("trace")); // Delete this line
}
/*
@@ -75,8 +77,12 @@
}
if(currentMember == null)
{
+ replaced = true;
+ if (log.isDebugEnabled())
+ {
+ log.debug("ZZZ " + this + " MEMBER WAS NULL, Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
+ }
topology.put(nodeId, member);
- replaced = true;
}
else
{
@@ -103,8 +109,14 @@
if(debug)
{
log.debug(this + "::Topology updated=" + replaced);
- log.debug(describe("After:"));
+ log.debug(describe(this + "::After:"));
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("ZZZ " + this + " Add member nodeId=" + nodeId + " member = " + member + " replaced = " + replaced + " size = " + topology.size(), new Exception ("trace"));
+ }
+
return replaced;
}
@@ -113,12 +125,12 @@
TopologyMember member = topology.remove(nodeId);
if (log.isDebugEnabled())
{
- log.debug("XXX " + this + " removing nodeID=" + nodeId + ", result=" + member, new Exception ("trace"));
+ log.debug("ZZZ " + this + " removing nodeID=" + nodeId + ", result=" + member + ", size = " + topology.size(), new Exception ("trace"));
}
return (member != null);
}
- public synchronized void sendTopology(ClusterTopologyListener listener)
+ public void sendTopology(ClusterTopologyListener listener)
{
int count = 0;
Map<String, TopologyMember> copy;
@@ -144,14 +156,21 @@
public Collection<TopologyMember> getMembers()
{
- return topology.values();
+ ArrayList<TopologyMember> members;
+ synchronized (this)
+ {
+ members = new ArrayList<TopologyMember>(topology.values());
+ }
+ return members;
}
- public int nodes()
+ public synchronized int nodes()
{
int count = 0;
for (TopologyMember member : topology.values())
{
+
+ // ARRUMAR ISSO
if (member.getConnector().a != null)
{
count++;
@@ -182,6 +201,10 @@
public void clear()
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("ZZZ " + this + "::clear", new Exception ("trace"));
+ }
topology.clear();
}
@@ -224,7 +247,7 @@
}
else
{
- return "Topology [owner=" + owner + "]";
+ return "Topology@" + Integer.toHexString(System.identityHashCode(this)) + "[owner=" + owner + "]";
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-28 17:01:30 UTC (rev 11064)
@@ -118,12 +118,12 @@
{
public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
+ channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
public void nodeDown(String nodeID)
{
- channel0.send(new ClusterTopologyChangeMessage(nodeID));
+ channel0.sendAndFlush(new ClusterTopologyChangeMessage(nodeID));
}
};
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 17:01:30 UTC (rev 11064)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -34,7 +35,9 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.client.impl.ServerLocatorImpl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -70,7 +73,11 @@
private static final boolean isTrace = log.isTraceEnabled();
- private final org.hornetq.utils.ExecutorFactory executorFactory;
+ private final ExecutorFactory executorFactory;
+
+ private final Topology clusterManagerTopology;
+
+ private final Executor executor;
private final HornetQServer server;
@@ -127,6 +134,7 @@
private final ClusterManagerImpl manager;
public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ final Topology clusterManagerTopology,
final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
@@ -183,6 +191,8 @@
this.routeWhenNoConsumers = routeWhenNoConsumers;
this.executorFactory = executorFactory;
+
+ this.executor = executorFactory.getExecutor();
this.server = server;
@@ -203,6 +213,8 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
this.manager = manager;
+
+ this.clusterManagerTopology = clusterManagerTopology;
clusterConnector = new StaticClusterConnector(tcConfigs);
@@ -219,6 +231,7 @@
}
public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ final Topology clusterManagerTopology,
DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
@@ -275,6 +288,8 @@
this.routeWhenNoConsumers = routeWhenNoConsumers;
this.executorFactory = executorFactory;
+
+ this.executor = executorFactory.getExecutor();
this.server = server;
@@ -297,6 +312,8 @@
clusterConnector = new DiscoveryClusterConnector(dg);
this.manager = manager;
+
+ this.clusterManagerTopology = clusterManagerTopology;
}
public synchronized void start() throws Exception
@@ -352,12 +369,18 @@
props);
managementService.sendNotification(notification);
}
+
+ executor.execute(new Runnable(){
+ public void run()
+ {
+ if(serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
- if(serverLocator != null)
- {
- serverLocator.close();
- serverLocator = null;
- }
+ }
+ });
started = false;
}
@@ -1258,7 +1281,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs);
+ return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
}
else
{
@@ -1289,7 +1312,7 @@
public ServerLocatorInternal createServerLocator()
{
- return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg);
+ return new ServerLocatorImpl(clusterManagerTopology, true, dg);
}
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 17:01:30 UTC (rev 11064)
@@ -245,13 +245,17 @@
backupServerLocator = null;
}
- for (ServerLocator clusterLocator : clusterLocators)
+ executor.execute(new Runnable()
{
- log.info("WWW Closing clusterLocator " + clusterLocator);
- clusterLocator.close();
- log.info("WWW Closed clusterLocator " + clusterLocator);
- }
- clusterLocators.clear();
+ public void run()
+ {
+ for (ServerLocator clusterLocator : clusterLocators)
+ {
+ clusterLocator.close();
+ }
+ clusterLocators.clear();
+ }
+ });
started = false;
topologyListeners.clear();
@@ -829,6 +833,7 @@
}
clusterConnection = new ClusterConnectionImpl(this,
+ topology,
dg,
connector,
new SimpleString(config.getName()),
@@ -865,6 +870,7 @@
}
clusterConnection = new ClusterConnectionImpl(this,
+ topology,
tcConfigs,
connector,
new SimpleString(config.getName()),
12 years, 9 months
JBoss hornetq SVN: r11063 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 23:47:39 -0400 (Wed, 27 Jul 2011)
New Revision: 11063
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fixes
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
@@ -648,28 +648,22 @@
if (ha || clusterConnection)
{
- long toWait = 30000;
- long start = System.currentTimeMillis();
- while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && toWait > 0)
+ long timeout = System.currentTimeMillis() + 30000;
+ while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && timeout > System.currentTimeMillis())
{
// Now wait for the topology
-
+
try
{
- wait(toWait);
+ wait(1000);
}
catch (InterruptedException ignore)
{
}
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
}
- if (toWait <= 0)
+ if (System.currentTimeMillis() > timeout && ! receivedTopology && !closed && !closing)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology");
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
@@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -125,7 +124,10 @@
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
- public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
+ private final ClusterManagerImpl manager;
+
+ public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
@@ -199,6 +201,8 @@
this.clusterPassword = clusterPassword;
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+
+ this.manager = manager;
clusterConnector = new StaticClusterConnector(tcConfigs);
@@ -214,7 +218,8 @@
}
- public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
+ public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
@@ -290,6 +295,8 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
clusterConnector = new DiscoveryClusterConnector(dg);
+
+ this.manager = manager;
}
public synchronized void start() throws Exception
@@ -646,7 +653,6 @@
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
- targetLocator.setIdentity("(Cluster-connection-bridge::" + this.toString() + ")");
targetLocator.setRetryInterval(retryInterval);
targetLocator.setMaxRetryInterval(maxRetryInterval);
@@ -660,6 +666,8 @@
{
targetLocator.setRetryInterval(retryInterval);
}
+
+ manager.addClusterLocator(targetLocator);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
targetLocator,
@@ -687,7 +695,10 @@
record,
record.getConnector());
- return bridge;
+
+ targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
+
+ return bridge;
}
// Inner classes -----------------------------------------------------------------------------------
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
@@ -19,7 +19,6 @@
import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -37,6 +36,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
@@ -106,7 +106,7 @@
private volatile ServerLocatorInternal backupServerLocator;
- private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
+ private final Set<ServerLocator> clusterLocators = new ConcurrentHashSet<ServerLocator>();
private final Executor executor;
@@ -245,9 +245,11 @@
backupServerLocator = null;
}
- for (ServerLocatorInternal clusterLocator : clusterLocators)
+ for (ServerLocator clusterLocator : clusterLocators)
{
+ log.info("WWW Closing clusterLocator " + clusterLocator);
clusterLocator.close();
+ log.info("WWW Closed clusterLocator " + clusterLocator);
}
clusterLocators.clear();
started = false;
@@ -483,6 +485,11 @@
log.warn("no cluster connections defined, unable to announce backup");
}
}
+
+ void addClusterLocator(final ServerLocatorInternal serverLocator)
+ {
+ this.clusterLocators.add(serverLocator);
+ }
private synchronized void announceNode()
{
@@ -721,7 +728,9 @@
log.debug("Bridge " + config.getName() +
" is configured to not use duplicate detecion, it will send messages synchronously");
}
+
clusterLocators.add(serverLocator);
+
Bridge bridge = new BridgeImpl(serverLocator,
config.getReconnectAttempts(),
config.getRetryInterval(),
@@ -819,7 +828,8 @@
log.debug("XXX " + this + " Starting a Discovery Group Cluster Connection, name=" + config.getDiscoveryGroupName() + ", dg=" + dg);
}
- clusterConnection = new ClusterConnectionImpl(dg,
+ clusterConnection = new ClusterConnectionImpl(this,
+ dg,
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
@@ -854,7 +864,8 @@
log.debug("XXX " + this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
}
- clusterConnection = new ClusterConnectionImpl(tcConfigs,
+ clusterConnection = new ClusterConnectionImpl(this,
+ tcConfigs,
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
12 years, 9 months
JBoss hornetq SVN: r11062 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 23:01:31 -0400 (Wed, 27 Jul 2011)
New Revision: 11062
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
Log:
tweak
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 01:34:23 UTC (rev 11061)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 03:01:31 UTC (rev 11062)
@@ -46,7 +46,6 @@
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -69,7 +68,7 @@
private String identity;
- private final Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
+ private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
12 years, 9 months
JBoss hornetq SVN: r11061 - in branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests: integration/cluster/failover and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 21:34:23 -0400 (Wed, 27 Jul 2011)
New Revision: 11061
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-28 01:32:47 UTC (rev 11060)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-07-28 01:34:23 UTC (rev 11061)
@@ -249,6 +249,11 @@
protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
{
+ waitForTopology(server, nodes, WAIT_TIMEOUT);
+ }
+
+ protected void waitForTopology(final HornetQServer server, final int nodes, final long timeout) throws Exception
+ {
log.debug("waiting for " + nodes + " on the topology for server = " + server);
@@ -265,16 +270,13 @@
Thread.sleep(10);
}
- while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
+ while (System.currentTimeMillis() - start < timeout);
String msg = "Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") nodes on server = " + server + ")\n Current topology:" + topology.describe();
ClusterTestBase.log.error(msg);
throw new Exception (msg);
-
-
-
}
protected void waitForBindings(final int node,
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-07-28 01:32:47 UTC (rev 11060)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2011-07-28 01:34:23 UTC (rev 11061)
@@ -88,6 +88,18 @@
verifyReceiveAll(10, 0);
}
+
+ public void _testLoop() throws Exception
+ {
+ for (int i = 0 ; i < 100; i++)
+ {
+ log.info("#test " + i);
+ testRequestResponseNoWaitForBindings();
+ tearDown();
+ setUp();
+ }
+
+ }
/*
* Don't wait for the response queue bindings to get to the other side
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-28 01:32:47 UTC (rev 11060)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-28 01:34:23 UTC (rev 11061)
@@ -36,11 +36,11 @@
setupServers();
setupClusters();
}
-
+
protected void setupServers()
{
setupServer(0, isFileStorage(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
}
protected void setupClusters()
@@ -95,7 +95,7 @@
stopServers(0, 1);
}
-
+
public void testStartPauseStartOther() throws Exception
{
@@ -104,11 +104,11 @@
setupSessionFactory(0, isNetty());
createQueue(0, "queues", "queue0", null, false);
addConsumer(0, 0, "queue0", null);
-
- // we let the discovery initial timeout expire,
+
+ // we let the discovery initial timeout expire,
// #0 will be alone in the cluster
Thread.sleep(12000);
-
+
startServers(1);
setupSessionFactory(1, isNetty());
createQueue(1, "queues", "queue0", null, false);
@@ -127,10 +127,10 @@
stopServers(0, 1);
}
-
+
public void _testLoop() throws Exception
{
- for (int i = 0 ; i < 100; i++)
+ for (int i = 0; i < 1000; i++)
{
log.info("#test " + i);
testStopStart();
@@ -139,6 +139,36 @@
}
}
+ public void testRestartTest() throws Throwable
+ {
+ startServers(0, 1);
+ waitForTopology(servers[0], 2);
+
+ // try
+ // {
+ // stopServers(1);
+ // waitForTopology(servers[0], 1);
+ // startServers(1);
+ // waitForTopology(servers[0], 2);
+ // stopServers(0,1);
+ // }
+ // catch (Throwable e)
+ // {
+ // e.printStackTrace(System.out);
+ // throw e;
+ // }
+ for (int i = 0; i < 100; i++)
+ {
+ log.info("#stop #test #" + i);
+ stopServers(1);
+ waitForTopology(servers[0], 1, 2000);
+ log.info("#start #test #" + i);
+ startServers(1);
+ waitForTopology(servers[0], 2, 2000);
+ }
+
+ }
+
public void testStopStart() throws Exception
{
startServers(0, 1);
@@ -163,23 +193,27 @@
verifyNotReceive(0, 1);
removeConsumer(1);
-
+
closeSessionFactory(1);
-
+
log.info("*********** Stopping server 1");
stopServers(1);
log.info("*********** Stopped server 1");
System.out.println(clusterDescription(servers[0]));
-
+
// Sleep some time as the node should be retrying
// The retry here is part of the test
- Thread.sleep(1000);
+ Thread.sleep(10);
- log.info ("********* Starting server 1");
+ waitForTopology(servers[0], 1);
+
+ log.info("********* Starting server 1");
startServers(1);
- log.info ("********* Describing servers");
+ waitForTopology(servers[0], 2);
+
+ log.info("********* Describing servers");
log.info(servers[0].describe());
log.info(servers[1].describe());
@@ -199,6 +233,6 @@
verifyReceiveRoundRobin(10, 0, 1);
verifyNotReceive(0, 1);
- stopServers(0, 1);
+ stopServers(0, 1);
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-28 01:32:47 UTC (rev 11060)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-28 01:34:23 UTC (rev 11061)
@@ -214,8 +214,7 @@
}
catch (IOException e)
{
- e.printStackTrace();
- System.exit(9);
+ throw e;
}
try
{
@@ -224,8 +223,7 @@
}
catch (IOException e)
{
- e.printStackTrace();
- System.exit(9);
+ throw e;
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-07-28 01:32:47 UTC (rev 11060)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/util/UnitTestCase.java 2011-07-28 01:34:23 UTC (rev 11061)
@@ -961,6 +961,7 @@
if (invmSize > 0)
{
InVMRegistry.instance.clear();
+ log.info(threadDump("Thread dump"));
fail("invm registry still had acceptors registered");
}
12 years, 9 months
JBoss hornetq SVN: r11060 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 21:32:47 -0400 (Wed, 27 Jul 2011)
New Revision: 11060
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 01:32:47 UTC (rev 11060)
@@ -46,6 +46,7 @@
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -65,10 +66,10 @@
private boolean finalizeCheck = true;
private boolean clusterConnection;
-
+
private String identity;
- private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
+ private final Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
private Set<ClientSessionFactory> factories = new HashSet<ClientSessionFactory>();
@@ -159,7 +160,7 @@
private final List<Interceptor> interceptors = new CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
-
+
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
@@ -476,7 +477,7 @@
public void start(Executor executor) throws Exception
{
initialise();
-
+
this.startExecutor = executor;
executor.execute(new Runnable()
@@ -650,7 +651,7 @@
{
long toWait = 30000;
long start = System.currentTimeMillis();
- while (!receivedTopology && toWait > 0)
+ while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && toWait > 0)
{
// Now wait for the topology
@@ -674,12 +675,14 @@
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology");
}
+
}
addFactory(factory);
return factory;
}
+
}
public boolean isHA()
@@ -1037,7 +1040,7 @@
throw new IllegalStateException("Cannot set attribute on SessionFactory after it has been used");
}
}
-
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1107,7 +1110,7 @@
if (log.isDebugEnabled())
{
- log.debug("YYY " + this + " is calling close", new Exception ("trace"));
+ log.debug("YYY " + this + " is calling close", new Exception("trace"));
}
closing = true;
@@ -1188,7 +1191,7 @@
}
return;
}
-
+
if (log.isDebugEnabled())
{
log.debug("XXX YYY nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
@@ -1229,7 +1232,11 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::Ignoring notifyNodeUp for " + nodeID + " connectorPair=" + connectorPair + ", since ha=false and clusterConnection=false");
+ log.debug(this + "::Ignoring notifyNodeUp for " +
+ nodeID +
+ " connectorPair=" +
+ connectorPair +
+ ", since ha=false and clusterConnection=false");
}
return;
}
@@ -1277,9 +1284,11 @@
@Override
public String toString()
{
- if (clusterConnection)
+ if (identity != null)
{
- return "ServerLocatorImpl (clusterConnection identity=" + identity + ") [initialConnectors=" + Arrays.toString(initialConnectors) +
+ return "ServerLocatorImpl (identity=" + identity +
+ ") [initialConnectors=" +
+ Arrays.toString(initialConnectors) +
", discoveryGroupConfiguration=" +
discoveryGroupConfiguration +
"]";
@@ -1444,10 +1453,14 @@
}
}
});
-
+
if (log.isDebugEnabled())
{
- log.debug("XXX Returning " + csf + " after " + retryNumber + " retries on StaticConnector " + ServerLocatorImpl.this);
+ log.debug("XXX Returning " + csf +
+ " after " +
+ retryNumber +
+ " retries on StaticConnector " +
+ ServerLocatorImpl.this);
}
return csf;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-28 01:32:47 UTC (rev 11060)
@@ -625,7 +625,7 @@
BridgeImpl.log.debug("Connecting " + this + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
retryCount++;
-
+
try
{
if (csf == null || csf.isClosed())
@@ -712,12 +712,15 @@
// We are not going to count this one as a retry
retryCount--;
- scheduleRetryConnectFixedTimeout(100);
+ scheduleRetryConnectFixedTimeout(this.retryInterval);
return;
}
else
{
- BridgeImpl.log.warn("Bridge " + this + " is unable to connect to destination. Retrying", e);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this + " is unable to connect to destination. Retrying", e);
+ }
}
}
catch (Exception e)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 01:32:47 UTC (rev 11060)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -31,9 +32,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -498,9 +497,9 @@
{
log.error("Failed to close flow record", e);
}
+
+ server.getClusterManager().notifyNodeDown(nodeID);
}
-
- server.getClusterManager().notifyNodeDown(nodeID);
}
@@ -538,6 +537,7 @@
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null)
{
+ log.warn("ServerLocator==null FixME!!!!!");
return;
}
/*we dont create bridges to backups*/
@@ -633,7 +633,7 @@
protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
- ServerLocatorInternal targetLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+ final ServerLocatorInternal targetLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
targetLocator.setReconnectAttempts(0);
@@ -656,8 +656,6 @@
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
- targetLocator.addClusterTopologyListener(this);
-
if(retryInterval > 0)
{
targetLocator.setRetryInterval(retryInterval);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-27 20:03:39 UTC (rev 11059)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 01:32:47 UTC (rev 11060)
@@ -265,7 +265,7 @@
return;
}
- log.info("XXX " + this + "::removing nodeID=" + nodeID);
+ log.debug("XXX " + this + "::removing nodeID=" + nodeID, new Exception ("trace"));
boolean removed = topology.removeMember(nodeID);
12 years, 9 months
JBoss hornetq SVN: r11059 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 16:03:39 -0400 (Wed, 27 Jul 2011)
New Revision: 11059
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-27 20:02:45 UTC (rev 11058)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-07-27 20:03:39 UTC (rev 11059)
@@ -172,7 +172,9 @@
System.out.println(clusterDescription(servers[0]));
- Thread.sleep(5000);
+ // Sleep some time as the node should be retrying
+ // The retry here is part of the test
+ Thread.sleep(1000);
log.info ("********* Starting server 1");
startServers(1);
12 years, 9 months
JBoss hornetq SVN: r11058 - branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 16:02:45 -0400 (Wed, 27 Jul 2011)
New Revision: 11058
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
fixing test
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-27 17:29:18 UTC (rev 11057)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-27 20:02:45 UTC (rev 11058)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
@@ -652,8 +653,10 @@
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
targetLocator.setNodeID(serverLocator.getNodeID());
+
+ targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
- targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
+ targetLocator.addClusterTopologyListener(this);
if(retryInterval > 0)
{
12 years, 9 months
JBoss hornetq SVN: r11057 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-27 13:29:18 -0400 (Wed, 27 Jul 2011)
New Revision: 11057
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
Log:
throw the right kind of exception.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-07-27 17:28:44 UTC (rev 11056)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-07-27 17:29:18 UTC (rev 11057)
@@ -26,9 +26,9 @@
import org.hornetq.core.logging.Logger;
/**
- *
+ *
* A AIOSequentialFile
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
@@ -227,7 +227,7 @@
public void sync() throws Exception
{
- throw new IllegalArgumentException("This method is not supported on AIO");
+ throw new UnsupportedOperationException("This method is not supported on AIO");
}
public long size() throws Exception
@@ -268,7 +268,7 @@
}
/**
- *
+ *
* @param sync Not used on AIO
* */
public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
12 years, 9 months