[jboss-cvs] JBoss Messaging SVN: r8171 - branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jan 18 22:52:45 EST 2011
Author: gaohoward
Date: 2011-01-18 22:52:45 -0500 (Tue, 18 Jan 2011)
New Revision: 8171
Modified:
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java
branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
Log:
save work
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java 2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java 2011-01-19 03:52:45 UTC (rev 8171)
@@ -65,6 +65,8 @@
public static final int ADD_ALL_REPLICATED_DELIVERIES_REQUEST = 12;
public static final int GET_REPLICATED_DELIVERIES_REQUEST = 13;
+
+ public static final int NODE_DEAD_REQUEST = 20;
protected static final int NULL = 0;
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-01-19 03:52:45 UTC (rev 8171)
@@ -258,8 +258,13 @@
public void multicastControl(ClusterRequest request, boolean sync) throws Exception
{
- if (ready.get())
- {
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
+ if (ready.get())
+ {
if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
Message message = new Message(null, null, writeRequest(request));
@@ -290,7 +295,12 @@
public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
{
- if (ready.get())
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
+ if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
@@ -321,6 +331,11 @@
public void multicastData(ClusterRequest request) throws Exception
{
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
if (ready.get())
{
if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
@@ -333,7 +348,12 @@
public void unicastData(ClusterRequest request, Address address) throws Exception
{
- if (ready.get())
+ if (!requestTarget.isAvailable())
+ {
+ if (trace) { log.trace(this + " the request target is not available"); }
+ }
+
+ if (ready.get())
{
if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-01-19 03:52:45 UTC (rev 8171)
@@ -528,12 +528,6 @@
}
new CleanupDeadNode().executeWithRetry();
}
-
- //broadcast
- NodeDeadRequest request = new NodeDeadRequest(qNode);
-
- groupMember.multicastControl(request, true);
-
}
/*
@@ -544,24 +538,26 @@
private void processClusterState() throws Exception
{
Iterator<Integer> iter = quarantinedNodes.keySet().iterator();
+
+ ClusterState clusterStateCopy = clusterState.copy();
+
while (iter.hasNext())
{
Integer qNodeID = iter.next();
- if (clusterState.isNodeDead(qNodeID))
+ if (clusterStateCopy.isNodeDead(qNodeID))
{
+ QuarantinedNode qNode = quarantinedNodes.get(qNodeID);
+ Integer fNodeID = qNode.getFailover();
+
Integer foNodeID = (Integer)failoverMap.get(qNodeID);
if (foNodeID == null)
{
- //that doesn't mean it hadn't one before, possible cases:
- // 1 its failover node was also quarantined.
- // 2 its failover node was already dead.
- // 3 really messed up.
- //for 1 and 2, we maintain the relationship such that each quarantined node
- //points to a node in an active node in the cluster.
- throw new IllegalStateException("Cannot find failover node for node " + qNodeID);
+ throw new IllegalStateException("Failover node " + fNodeID + " for node " + qNode + " is not alive!");
}
- if (foNodeID.intValue() == thisNodeID)
+ boolean failoverDone = false;
+
+ if (fNodeID.intValue() == thisNodeID)
{
//I am the failover node for the dead, perform failover now
if (quarantinedNodes.get(qNodeID).shouldFailover() && isSupportsFailover())
@@ -572,22 +568,26 @@
//now clean up the quarantined set
cleanUpQuarantinedNode(qNodeID);
+
+ //broadcast
+ NodeDeadRequest request = new NodeDeadRequest(qNodeID);
+
+ groupMember.multicastControl(request, true);
+
+ failoverDone = true;
}
}
- else
+
+ if ((!failoverDone) && (!clusterStateCopy.isQuarantined(qNodeID)))
{
//I am not the failover node for the dead, clean up now. But don't delete it from quarantined map
//nor from the DB. we wait for it to be failed over. Two possible cases:
//1 this node later becomes the failover for the dead node, so this node cleans up it.
//2 other node failed over the dead node, this node get notified and clean up it.
+ cleanDataForNode(qNodeID);
}
}
}
-
- if (clusterState.isQuarantined(thisNodeID))
- {
- //I am quarantined, work as standalone now
- }
}
//timestamp and query the new state from db
@@ -614,17 +614,21 @@
ps.executeUpdate();
- //collect states
- ps = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
- ResultSet result = ps.executeQuery();
-
- while (result.next())
+ synchronized (clusterState)
{
- int nodeID = result.getInt(1);
- long timestamp = result.getLong(2);
- int nodeState = result.getInt(3);
-
- clusterState.addNode(nodeID, timestamp, nodeState);
+ clusterState.clear();
+ // collect states
+ ps = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
+ ResultSet result = ps.executeQuery();
+
+ while (result.next())
+ {
+ int nodeID = result.getInt(1);
+ long timestamp = result.getLong(2);
+ int nodeState = result.getInt(3);
+
+ clusterState.addNode(nodeID, timestamp, nodeState);
+ }
}
}
finally
@@ -1341,7 +1345,28 @@
*/
public void nodeJoined(Address address) throws Exception
{
- log.debug(this + ": " + address + " joined");
+ log.debug(this + ": " + address + " joined");
+ Integer newNode = findNodeIDForAddress(address);
+
+ QuarantinedNode qNode = quarantinedNodes.remove(newNode);
+
+ if (qNode == null) return;
+
+ log.debug("A quarantined node " + qNode + " re-joined cluster.");
+
+
+ //if I am the quarantined node, I do rejoin.
+ if (clusterState.isQuarantined(thisNodeID))
+ {
+ updateStateInStorage(thisNodeID, STATE_CLUSTERED);
+ PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
+ groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+ }
+ else
+ {
+ //accept rejoin (suckers?)
+
+ }
}
public void nodesLeft(List addresses) throws Throwable
@@ -1618,6 +1643,14 @@
sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
}
+
+ //some node is finally dead and failed over
+ //so it's safe to remove it now.
+ public void handleNodeDead(int nodeId)
+ {
+ QuarantinedNode node = quarantinedNodes.remove(nodeId);
+ log.info("Quarantined node " + node + " is finally dead.");
+ }
/**
* @param originatorNodeID - the ID of the node that initiated the modification.
@@ -3833,11 +3866,30 @@
{
Map<Integer, NodeState> states = new HashMap<Integer, NodeState>();
+ private ClusterState(Map<Integer, NodeState> copy)
+ {
+ states = new HashMap<Integer, NodeState>(copy);
+ }
+
+ public void clear()
+ {
+ states.clear();
+ }
+
+ public ClusterState()
+ {
+ }
+
public void addNode(int nodeID, long timestamp, int nodeState)
{
states.put(nodeID, new NodeState(nodeID, timestamp, nodeState));
}
+ public synchronized ClusterState copy()
+ {
+ return new ClusterState(states);
+ }
+
public boolean isQuarantined(int qNodeID)
{
NodeState nState = states.get(qNodeID);
@@ -3882,4 +3934,9 @@
}
}
+ public boolean isAvailable()
+ {
+ return clusterState.isQuarantined(thisNodeID);
+ }
+
}
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java 2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java 2011-01-19 03:52:45 UTC (rev 8171)
@@ -43,6 +43,10 @@
{
leftNode = qNode.intValue();
}
+
+ public NodeDeadRequest()
+ {
+ }
public void write(DataOutputStream out) throws Exception
{
@@ -56,30 +60,12 @@
Object execute(RequestTarget office) throws Throwable
{
+ office.handleNodeDead(leftNode);
return null;
}
byte getType()
{
- return 0;
+ return ClusterRequest.NODE_DEAD_REQUEST;
}
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java 2011-01-19 03:52:45 UTC (rev 8171)
@@ -68,4 +68,8 @@
void handleAddAllReplicatedDeliveries(int nodeID, Map deliveries) throws Exception;
void handleGetReplicatedDeliveries(String queueName, Address returnAddress) throws Exception;
+
+ void handleNodeDead(int nodeId);
+
+ boolean isAvailable();
}
More information about the jboss-cvs-commits
mailing list