[jboss-cvs] JBoss Messaging SVN: r8171 - branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jan 18 22:52:45 EST 2011


Author: gaohoward
Date: 2011-01-18 22:52:45 -0500 (Tue, 18 Jan 2011)
New Revision: 8171

Modified:
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java
   branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
Log:
save work


Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	2011-01-19 03:52:45 UTC (rev 8171)
@@ -65,6 +65,8 @@
 	public static final int ADD_ALL_REPLICATED_DELIVERIES_REQUEST = 12;
 	
 	public static final int GET_REPLICATED_DELIVERIES_REQUEST = 13;
+	
+	public static final int NODE_DEAD_REQUEST = 20;
 		
 	
 	protected static final int NULL = 0;

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-01-19 03:52:45 UTC (rev 8171)
@@ -258,8 +258,13 @@
    
    public void multicastControl(ClusterRequest request, boolean sync) throws Exception
    {
-   	if (ready.get())
-   	{   		
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
+      if (ready.get())
+   	{
 	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
 	
 	   	Message message = new Message(null, null, writeRequest(request));
@@ -290,7 +295,12 @@
    
    public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
    {
-   	if (ready.get())
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
+      if (ready.get())
    	{   		
 	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
 	
@@ -321,6 +331,11 @@
    
    public void multicastData(ClusterRequest request) throws Exception
    {
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
    	if (ready.get())
    	{   		
 	   	if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
@@ -333,7 +348,12 @@
    
    public void unicastData(ClusterRequest request, Address address) throws Exception
    {
-   	if (ready.get())
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
+      if (ready.get())
    	{
 	   	if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
 	

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-19 03:52:45 UTC (rev 8171)
@@ -528,12 +528,6 @@
          }
          new CleanupDeadNode().executeWithRetry();
       }
-      
-      //broadcast
-      NodeDeadRequest request = new NodeDeadRequest(qNode);
-      
-      groupMember.multicastControl(request, true);
-      
    }
    
    /*
@@ -544,24 +538,26 @@
    private void processClusterState() throws Exception
    {
       Iterator<Integer> iter = quarantinedNodes.keySet().iterator();
+      
+      ClusterState clusterStateCopy = clusterState.copy();
+      
       while (iter.hasNext())
       {
          Integer qNodeID = iter.next();
-         if (clusterState.isNodeDead(qNodeID))
+         if (clusterStateCopy.isNodeDead(qNodeID))
          {
+            QuarantinedNode qNode = quarantinedNodes.get(qNodeID);
+            Integer fNodeID = qNode.getFailover();
+            
             Integer foNodeID = (Integer)failoverMap.get(qNodeID);
             if (foNodeID == null)
             {
-               //that doesn't mean it hadn't one before, possible cases:
-               // 1 its failover node was also quarantined.
-               // 2 its failover node was already dead.
-               // 3 really messed up.
-               //for 1 and 2, we maintain the relationship such that each quarantined node
-               //points to a node in an active node in the cluster.
-               throw new IllegalStateException("Cannot find failover node for node " + qNodeID);
+               throw new IllegalStateException("Failover node " + fNodeID + " for node " + qNode + " is not alive!");
             }
             
-            if (foNodeID.intValue() == thisNodeID)
+            boolean failoverDone = false;
+            
+            if (fNodeID.intValue() == thisNodeID)
             {
                //I am the failover node for the dead, perform failover now
                if (quarantinedNodes.get(qNodeID).shouldFailover() && isSupportsFailover())
@@ -572,22 +568,26 @@
                    
                    //now clean up the quarantined set
                    cleanUpQuarantinedNode(qNodeID);
+                   
+                   //broadcast
+                   NodeDeadRequest request = new NodeDeadRequest(qNodeID);
+                   
+                   groupMember.multicastControl(request, true);
+                   
+                   failoverDone = true;
                }
             }
-            else
+
+            if ((!failoverDone) && (!clusterStateCopy.isQuarantined(qNodeID)))
             {
                //I am not the failover node for the dead, clean up now. But don't delete it from quarantined map
                //nor from the DB. we wait for it to be failed over. Two possible cases:
                //1 this node later becomes the failover for the dead node, so this node cleans up it.
                //2 other node failed over the dead node, this node get notified and clean up it.
+               cleanDataForNode(qNodeID);
             }
          }
       }
-      
-      if (clusterState.isQuarantined(thisNodeID))
-      {
-         //I am quarantined, work as standalone now
-      }
    }
    
    //timestamp and query the new state from db
@@ -614,17 +614,21 @@
 
                ps.executeUpdate();
                
-               //collect states
-               ps = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
-               ResultSet result = ps.executeQuery();
-               
-               while (result.next())
+               synchronized (clusterState)
                {
-                  int nodeID = result.getInt(1);
-                  long timestamp = result.getLong(2);
-                  int nodeState = result.getInt(3);
-                  
-                  clusterState.addNode(nodeID, timestamp, nodeState);
+                  clusterState.clear();
+                  // collect states
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
+                  ResultSet result = ps.executeQuery();
+
+                  while (result.next())
+                  {
+                     int nodeID = result.getInt(1);
+                     long timestamp = result.getLong(2);
+                     int nodeState = result.getInt(3);
+
+                     clusterState.addNode(nodeID, timestamp, nodeState);
+                  }
                }
             }
             finally
@@ -1341,7 +1345,28 @@
     */
    public void nodeJoined(Address address) throws Exception
    {
-      log.debug(this + ": " + address + " joined");      
+      log.debug(this + ": " + address + " joined");
+      Integer newNode = findNodeIDForAddress(address);
+      
+      QuarantinedNode qNode = quarantinedNodes.remove(newNode);
+      
+      if (qNode == null) return;
+      
+      log.debug("A quarantined node " + qNode + " re-joined cluster.");
+
+      
+      //if I am the quarantined node, I do rejoin.
+      if (clusterState.isQuarantined(thisNodeID))
+      {
+         updateStateInStorage(thisNodeID, STATE_CLUSTERED);
+         PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
+         groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+      }
+      else
+      {
+         //accept rejoin (suckers?)
+         
+      }
    }
    
    public void nodesLeft(List addresses) throws Throwable
@@ -1618,6 +1643,14 @@
       
       sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
    }
+   
+   //some node is finally dead and failed over
+   //so it's safe to remove it now.
+   public void handleNodeDead(int nodeId)
+   {
+      QuarantinedNode node = quarantinedNodes.remove(nodeId);
+      log.info("Quarantined node " + node + " is finally dead.");
+   }
 
    /**
     * @param originatorNodeID - the ID of the node that initiated the modification.
@@ -3833,11 +3866,30 @@
    {
       Map<Integer, NodeState> states = new HashMap<Integer, NodeState>();
 
+      private ClusterState(Map<Integer, NodeState> copy)
+      {
+         states = new HashMap<Integer, NodeState>(copy);
+      }
+
+      public void clear()
+      {
+         states.clear();
+      }
+
+      public ClusterState()
+      {
+      }
+
       public void addNode(int nodeID, long timestamp, int nodeState)
       {
          states.put(nodeID, new NodeState(nodeID, timestamp, nodeState));
       }
 
+      public synchronized ClusterState copy()
+      {
+         return new ClusterState(states);
+      }
+
       public boolean isQuarantined(int qNodeID)
       {
          NodeState nState = states.get(qNodeID);
@@ -3882,4 +3934,9 @@
       }
    }
 
+   public boolean isAvailable()
+   {
+      return clusterState.isQuarantined(thisNodeID);
+   }
+
 }

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java	2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/NodeDeadRequest.java	2011-01-19 03:52:45 UTC (rev 8171)
@@ -43,6 +43,10 @@
    {
       leftNode = qNode.intValue();
    }
+   
+   public NodeDeadRequest()
+   {
+   }
 
    public void write(DataOutputStream out) throws Exception
    {
@@ -56,30 +60,12 @@
 
    Object execute(RequestTarget office) throws Throwable
    {
+      office.handleNodeDead(leftNode);
       return null;
    }
 
    byte getType()
    {
-      return 0;
+      return ClusterRequest.NODE_DEAD_REQUEST;
    }
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }

Modified: branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-01-18 18:00:47 UTC (rev 8170)
+++ branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-01-19 03:52:45 UTC (rev 8171)
@@ -68,4 +68,8 @@
    void handleAddAllReplicatedDeliveries(int nodeID, Map deliveries) throws Exception;
    
    void handleGetReplicatedDeliveries(String queueName, Address returnAddress) throws Exception;
+   
+   void handleNodeDead(int nodeId);
+
+   boolean isAvailable();
 }



More information about the jboss-cvs-commits mailing list