Author: clebert.suconic(a)jboss.com
Date: 2011-08-30 15:53:38 -0400 (Tue, 30 Aug 2011)
New Revision: 11246
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
Log:
tweaks on my branch
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30
17:51:05 UTC (rev 11245)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-30
19:53:38 UTC (rev 11246)
@@ -36,16 +36,12 @@
public class Topology implements Serializable
{
- private static final int BACKOF_TIMEOUT = 500;
-
private static final long serialVersionUID = -9037171688692471371L;
private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
- private transient HashMap<String, Pair<Long, Integer>> mapBackof = new
HashMap<String, Pair<Long, Integer>>();
-
private Executor executor = null;
/** Used to debug operations.
@@ -142,7 +138,7 @@
mapTopology.put(nodeId, currentMember);
}
- TopologyMember newMember = new TopologyMember(currentMember.getConnector().a,
memberInput.getConnector().b);
+ TopologyMember newMember = new TopologyMember(currentMember.getA(),
memberInput.getB());
newMember.setUniqueEventID(System.currentTimeMillis());
mapTopology.remove(nodeId);
mapTopology.put(nodeId, newMember);
@@ -162,12 +158,6 @@
*/
public boolean updateMember(final long uniqueEventID, final String nodeId, final
TopologyMember memberInput)
{
-
-// if (memberInput.getConnector().a == null && memberInput.getConnector().b
!= null)
-// {
-// updateBackup(nodeId, memberInput);
-// return true;
-// }
Long deleteTme = mapDelete.get(nodeId);
if (deleteTme != null && uniqueEventID < deleteTme)
@@ -216,7 +206,22 @@
memberInput);
}
- TopologyMember newMember = new
TopologyMember(memberInput.getConnector().a, memberInput.getConnector().b);
+ TopologyMember newMember = new TopologyMember(currentMember.getA(),
memberInput.getB());
+
+ if (memberInput.getA() == null && memberInput.getB() != null)
+ {
+ // Updating what appears to be a backup update
+ newMember.setA(currentMember.getA());
+ }
+ else
+ if (currentMember.getA() == null && currentMember.getB() != null
&& newMember.getA() != null && newMember.getB() == null)
+ {
+ // This is a situation where we have:
+ // CurrentMember (null, X) && Input(X, null)
+ // This means the backup has arrived before, hence we need to merge the
results
+ newMember.setA(currentMember.getA());
+ }
+
newMember.setUniqueEventID(uniqueEventID);
mapTopology.remove(nodeId);
mapTopology.put(nodeId, newMember);
@@ -276,46 +281,6 @@
}
/**
- * @param nodeId
- * @param backOfData
- */
- private boolean testBackof(final String nodeId)
- {
- Pair<Long, Integer> backOfData = mapBackof.get(nodeId);
-
- if (backOfData != null)
- {
- backOfData.b += 1;
-
- long timeDiff = System.currentTimeMillis() - backOfData.a;
-
- // To prevent a loop where nodes are being considered down and up
- if (backOfData.b > 5 && timeDiff < BACKOF_TIMEOUT)
- {
-
- // The cluster may get in loop without this..
- // Case one node is stll sending nodeDown while another member is sending
nodeUp
- log.warn(backOfData.b + ", The topology controller identified a blast
events and it's interrupting the flow of the loop, nodeID=" +
- nodeId +
- ", topologyInstance=" +
- this,
- new Exception("this exception is just to trace
location"));
- return false;
- }
- else if (timeDiff < BACKOF_TIMEOUT)
- {
- log.warn(this + "::Simple blast of " + nodeId, new
Exception("this exception is just to trace location"));
- }
- else if (timeDiff >= BACKOF_TIMEOUT)
- {
- mapBackof.remove(nodeId);
- }
- }
-
- return true;
- }
-
- /**
* @return
*/
private ArrayList<ClusterTopologyListener> copyListeners()
@@ -500,11 +465,11 @@
int count = 0;
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null)
+ if (member.getA() != null)
{
count++;
}
- if (member.getConnector().b != null)
+ if (member.getB() != null)
{
count++;
}
@@ -551,32 +516,13 @@
this.owner = owner;
}
- private boolean hasChanged(final String debugInfo, final TransportConfiguration a,
final TransportConfiguration b)
- {
- boolean changed = a == null && b != null || a != null && b != null
&& !a.equals(b);
-
- if (log.isTraceEnabled())
- {
-
- log.trace(this + "::Validating current=" +
- a +
- " != input=" +
- b +
- (changed ? " and it has changed" : " and it didn't
change") +
- ", for validation of " +
- debugInfo);
- }
-
- return changed;
- }
-
public TransportConfiguration getBackupForConnector(final TransportConfiguration
connectorConfiguration)
{
for (TopologyMember member : mapTopology.values())
{
- if (member.getConnector().a != null &&
member.getConnector().a.equals(connectorConfiguration))
+ if (member.getA() != null &&
member.getA().equals(connectorConfiguration))
{
- return member.getConnector().b;
+ return member.getB();
}
}
return null;
Modified:
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-30
17:51:05 UTC (rev 11245)
+++
branches/Branch_2_2_EAP_cluster_clean3/src/main/org/hornetq/core/client/impl/TopologyMember.java 2011-08-30
19:53:38 UTC (rev 11246)
@@ -41,7 +41,27 @@
this(new Pair<TransportConfiguration, TransportConfiguration>(a, b));
}
+
+ public TransportConfiguration getA()
+ {
+ return connector.a;
+ }
+ public TransportConfiguration getB()
+ {
+ return connector.b;
+ }
+
+ public void setB(TransportConfiguration param)
+ {
+ this.connector.b = param;
+ }
+
+ public void setA(TransportConfiguration param)
+ {
+ this.connector.a = param;
+ }
+
/**
* @return the uniqueEventID
*/