[jboss-cvs] JBoss Messaging SVN: r1664 - branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 30 12:39:55 EST 2006


Author: timfox
Date: 2006-11-30 12:39:53 -0500 (Thu, 30 Nov 2006)
New Revision: 1664

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
Log:
Second interim commit for failover refactoring



Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-30 15:27:59 UTC (rev 1663)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-11-30 17:39:53 UTC (rev 1664)
@@ -135,6 +135,8 @@
    //Map < node, failover node>
    private Map failoverMap;
    
+   private Set leftSet;
+   
    private Element syncChannelConfigE;
    
    private Element asyncChannelConfigE;
@@ -177,6 +179,8 @@
       replicationListeners = new HashSet();
       
       failoverMap = new LinkedHashMap();
+      
+      leftSet = new HashSet();
    }
    
    /*
@@ -662,6 +666,20 @@
     
    // PostOfficeInternal implementation ------------------------------------------------------------------
    
+   public void handleNodeLeft(int nodeId) throws Exception
+   {
+      lock.writeLock().acquire();
+      
+      try
+      {
+         leftSet.add(new Integer(nodeId));
+      }
+      finally
+      {
+         lock.writeLock().release();
+      }      
+   }
+   
    public void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception
    {
       lock.writeLock().acquire();
@@ -1227,78 +1245,9 @@
       }      
    }
    
-   /*
-    * Removes all non durable binding data, and any local replicant data for the specified
-    * node
-    */
-   public void removeDataForNode(int parameterNodeId) throws Exception
-   {
-      log.info("Node " + parameterNodeId + " requested to leave cluster");
-      
-      lock.writeLock().acquire();
-
-      try
-      {          
-         Map nameMap = (Map)nameMaps.get(new Integer(parameterNodeId));
-
-         if (nameMap != null)
-         {
-            List toRemove = new ArrayList();
-            
-            Iterator iter = nameMap.values().iterator();
-            
-            while (iter.hasNext())
-            {
-               Binding binding = (Binding)iter.next();
-               
-               if (!binding.getQueue().isRecoverable())
-               {
-                  //We only remove the non durable bindings - we still need to be able to handle
-                  //messages for a durable subscription "owned" by a node that is not active any more!
-                  toRemove.add(binding);
-               }
-            }
-            
-            iter = toRemove.iterator();
-            
-            while (iter.hasNext())
-            {
-               Binding binding = (Binding)iter.next();
-               
-               removeBinding(parameterNodeId, binding.getQueue().getName());
-            }
-         }
-         
-         //We need to remove any replicant data for the node
-         //this includes the node-address info
-         Iterator iter = replicatedData.entrySet().iterator();
-         
-         while (iter.hasNext())
-         {
-            Map.Entry entry = (Map.Entry)iter.next();
-            
-            String key = (String)entry.getKey();
-            
-            Map replicants = (Map)entry.getValue();
-            
-            replicants.remove(new Integer(parameterNodeId));
-            
-            if (replicants.isEmpty())
-            {
-               iter.remove();
-            }     
-            
-            //Need to trigger listeners
-            notifyListeners(key, replicants);
-         }                  
-      }
-      finally
-      {
-         lock.writeLock().release();
-      }
-   }
    
    
+   
    public int getNodeId()
    {
       return nodeId;
@@ -1706,27 +1655,27 @@
          routerMap.remove(queueName);
       }
    }
-
+   
    protected void loadBindings() throws Exception
    {
       if (trace) { log.trace(this.nodeId + " loading bindings"); }
-
+      
       boolean isState = syncChannel.getState(null, stateTimeout);
-
+      
       if (!isState)
       {
          //Must be first member in group or non clustered- we load the state ourself from the database
-
+         
          if (trace) { log.trace(this.nodeId + " First member of group- so loading bindings from db"); }
-
+         
          super.loadBindings();
       }
       else
       {
          //The state will be set in due course via the MessageListener - we must wait until this happens
-
+         
          if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
-
+         
          synchronized (setStateLock)
          {
             //TODO we should implement a timeout on this
@@ -1735,45 +1684,132 @@
                setStateLock.wait();
             }
          }
-
+         
          if (trace) { log.trace(this.nodeId + " Received state"); }
       }
    }
+   
+   protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
+   {
+      Queue queue;
+      if (nodeId == this.nodeId)
+      {
+         QueuedExecutor executor = (QueuedExecutor)pool.get();
+         
+         queue = new LocalClusteredQueue(this, nodeId, queueName, channelId, ms, pm, true,
+                  durable, executor, filter, tr);
+      }
+      else
+      {
+         queue = new RemoteQueueStub(nodeId, queueName, channelId, durable, pm, filter);
+      }
+      
+      Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
+      
+      return binding;
+   }
+   
+   // Private ------------------------------------------------------------------------------------------
+   
+   private boolean leaveMessageReceived(Integer nodeId) throws Exception
+   {
+      lock.writeLock().acquire();
+      
+      try
+      {
+         boolean removed = leftSet.remove(nodeId);
+         
+         return removed;
+      }
+      finally
+      {
+         lock.writeLock().release();
+      }    
+   }
+   
+   /*
+    * Removes all non durable binding data, and any local replicant data for the specified
+    * node
+    */
+   private void removeDataForNode(Integer parameterNodeId) throws Exception
+   {
+      log.info("Node " + parameterNodeId + " requested to leave cluster");
+      
+      lock.writeLock().acquire();
 
-    protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
-    {
-        Queue queue;
-        if (nodeId == this.nodeId)
-        {
-           QueuedExecutor executor = (QueuedExecutor)pool.get();
+      try
+      {          
+         Map nameMap = (Map)nameMaps.get(parameterNodeId);
 
-           queue = new LocalClusteredQueue(this, nodeId, queueName, channelId, ms, pm, true,
-                                           durable, executor, filter, tr);
-        }
-        else
-        {
-           queue = new RemoteQueueStub(nodeId, queueName, channelId, durable, pm, filter);
-        }
-
-        Binding binding = new DefaultBinding(nodeId, condition, queue, failed);
-
-        return binding;
-    }
-
-    // Private ------------------------------------------------------------------------------------------
-
-    private void notifyListeners(Serializable key, Map replicants)
-    {
-       Iterator iter = replicationListeners.iterator();
-       
-       while (iter.hasNext())
-       {
-          ReplicationListener listener = (ReplicationListener)iter.next();
-          
-          listener.onReplicationChange(key, replicants);
-       }
-    } 
-    
+         if (nameMap != null)
+         {
+            List toRemove = new ArrayList();
+            
+            Iterator iter = nameMap.values().iterator();
+            
+            while (iter.hasNext())
+            {
+               Binding binding = (Binding)iter.next();
+               
+               if (!binding.getQueue().isRecoverable())
+               {
+                  //We only remove the non durable bindings - we still need to be able to handle
+                  //messages for a durable subscription "owned" by a node that is not active any more!
+                  toRemove.add(binding);
+               }
+            }
+            
+            iter = toRemove.iterator();
+            
+            while (iter.hasNext())
+            {
+               Binding binding = (Binding)iter.next();
+               
+               removeBinding(parameterNodeId.intValue(), binding.getQueue().getName());
+            }
+         }
+         
+         //We need to remove any replicant data for the node
+         //this includes the node-address info
+         Iterator iter = replicatedData.entrySet().iterator();
+         
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
+            
+            String key = (String)entry.getKey();
+            
+            Map replicants = (Map)entry.getValue();
+            
+            replicants.remove(parameterNodeId);
+            
+            if (replicants.isEmpty())
+            {
+               iter.remove();
+            }     
+            
+            //Need to trigger listeners
+            notifyListeners(key, replicants);
+         }                  
+      }
+      finally
+      {
+         lock.writeLock().release();
+      }
+   }
+   
+   private void notifyListeners(Serializable key, Map replicants)
+   {
+      Iterator iter = replicationListeners.iterator();
+      
+      while (iter.hasNext())
+      {
+         ReplicationListener listener = (ReplicationListener)iter.next();
+         
+         listener.onReplicationChange(key, replicants);
+      }
+   } 
+   
    /*
     * Multicast a sync request
     */
@@ -2135,6 +2171,58 @@
    }
    
    /*
+    * A new node has joined the group
+    */
+   private void nodeJoined(Address address) throws Exception
+   {
+      //We need to regenerate the failover map
+      
+      generateFailoverMap(currentView);     
+   }
+   
+   /*
+    * A node has left the group
+    */
+   private void nodeLeft(Address address) throws Throwable
+   {
+      Integer nodeId = getNodeIdForSyncAddress(address);
+      
+      if (nodeId != null)
+      {
+         throw new IllegalStateException("Cannot find node id for address " + address);
+      }
+                  
+      boolean crashed = !this.leaveMessageReceived(nodeId);
+                        
+      if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
+            
+      //Cleanup any hanging transactions - we do this irrespective of whether we crashed      
+      check(nodeId);
+      
+      //Need to evaluate this before we regenerate the failover map
+      boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
+      
+      //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
+      //since that may cause connection factories to be rebound
+      generateFailoverMap(currentView);
+      
+      //Remove any replicant data and non durable bindings for the node - again we need to do this
+      //irrespective of whether we crashed
+      //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
+      removeDataForNode(nodeId);
+      
+      if (crashed && isFailover)
+      {
+         //The node crashed and we are the failover node
+         //so let's perform failover
+         
+         //TODO server side valve
+         
+         failOver(nodeId.intValue());
+      }
+   }
+      
+   /*
     * We use this class so we notice when members leave the group
     */
    private class ControlMembershipListener implements MembershipListener
@@ -2148,83 +2236,55 @@
       {
          //NOOP
       }
-
+      
       public void viewAccepted(View view)
       {
          if (trace) { log.trace(nodeId + " Got new view, size=" + view.size()); }
 
-         log.info("JBoss Messaging DefaultClusteredPostOffice Accepted new view:" + view);
-         
          //JGroups will make sure this method is never called by more than one thread concurrently
          
-         if (currentView != null)
-         {
-            Iterator iter = currentView.getMembers().iterator();
+         View oldView = currentView;
+         
+         currentView = view;
+         
+         try
+         {         
+            if (oldView != null)
+            {
+               Iterator iter = oldView.getMembers().iterator();
+               
+               while (iter.hasNext())
+               {
+                  Address address = (Address)iter.next();
+                  
+                  if (!view.containsMember(address))
+                  {
+                     nodeLeft(address);
+                  }
+               }
+            }
             
+            Iterator iter = view.getMembers().iterator();
+            
             while (iter.hasNext())
             {
                Address address = (Address)iter.next();
                
-               if (!view.containsMember(address))
+               if (oldView == null || !oldView.containsMember(address))
                {
-                  //Member must have left                                    
-                  if (trace) { log.trace(nodeId + " it seems that member " + address + " has left the group"); }
-                  
-                  Address currentAddress = syncChannel.getLocalAddress();
-                  
-                  //We don't remove bindings for ourself                  
-                  if (!address.equals(currentAddress))
-                  {                  
-                     try
-                     {
-                        Integer nodeId = getNodeIdForSyncAddress(address);
-                        
-                        //Perform a check - the member might have crashed and left uncommitted transactions
-                        //we need to resolve this - this should be performed irrespective of
-                        //whether the node crashed or left cleanly
-                        if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " Performing cleanup for node " + nodeId); }
-
-                        check(nodeId);
-                        
-                        //Remove any non durable binding and replicant data for the exited node
-                        //this should be performed irrespective of whether the node crashed or left cleanly
-                        removeDataForNode(nodeId.intValue());
-                        
-                        // nodeId==null means the server left the cluster without a problem. It sent a message before
-                        // leaving the cluster
-                        if (nodeId != null)
-                        {
-                           //This means the node has crashed
-                           if (trace) { log.trace("The node " + nodeId + " crashed"); }
-                                                      
-                           //If we are the failover node for the failed node then we should perform failover
-                           if (isFailoverNodeForNode(nodeId.intValue()))
-                           {
-                              failOver(nodeId.intValue());                              
-                           }
-
-                           if (trace) { log.trace(DefaultClusteredPostOffice.this.nodeId + " failover complete"); }
-                        }
-                        else
-                        {
-                           if (trace) { log.trace("The node " + nodeId + " left cleanly"); }
-                        }
-                     }
-                     catch (Throwable e)
-                     {
-                        log.error("Caught Exception in MembershipListener", e);
-                        IllegalStateException e2 = new IllegalStateException(e.getMessage());
-                        e2.setStackTrace(e.getStackTrace());
-                        throw e2;
-                     }
-                  }
+                  nodeJoined(address);
                }
             }
          }
-         
-         currentView = view;
+         catch (Throwable e)
+         {
+            log.error("Caught Exception in MembershipListener", e);
+            IllegalStateException e2 = new IllegalStateException(e.getMessage());
+            e2.setStackTrace(e.getStackTrace());
+            throw e2;
+         }
       }
-
+  
       public byte[] getState()
       {        
          //NOOP

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2006-11-30 15:27:59 UTC (rev 1663)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2006-11-30 17:39:53 UTC (rev 1664)
@@ -4,10 +4,15 @@
 import java.io.DataOutputStream;
 
 /**
+ * 
+ * A LeaveClusterRequest
+ *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @version <tt>$Revision:$</tt>
- *          <p/>
- *          $Id:$
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
  */
 public class LeaveClusterRequest extends ClusterRequest
 {
@@ -30,7 +35,7 @@
 
    Object execute(PostOfficeInternal office) throws Throwable
    {
-      office.removeDataForNode(nodeId);
+      office.handleNodeLeft(nodeId);
       return null;
    }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-11-30 15:27:59 UTC (rev 1663)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-11-30 17:39:53 UTC (rev 1664)
@@ -49,8 +49,8 @@
    
    void removeBindingFromCluster(int nodeId, String queueName)
       throws Exception;
-
-   void removeDataForNode(int nodeId) throws Exception;
+ 
+   void handleNodeLeft(int nodeId) throws Exception;
    
    void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception;
    




More information about the jboss-cvs-commits mailing list