[jboss-cvs] JBossAS SVN: r69539 - trunk/cluster/src/main/org/jboss/ha/framework/server.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 1 10:20:29 EST 2008


Author: bstansberry at jboss.com
Date: 2008-02-01 10:20:29 -0500 (Fri, 01 Feb 2008)
New Revision: 69539

Modified:
   trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
   trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
Log:
[JBAS-4313] Clean up mechanism for excluding duplicate nodes
Clean up how ClusterPartition handles first view

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-02-01 15:16:57 UTC (rev 69538)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/ClusterPartition.java	2008-02-01 15:20:29 UTC (rev 69539)
@@ -29,8 +29,6 @@
 import java.io.Serializable;
 import java.lang.ref.WeakReference;
 import java.net.InetAddress;
-import java.rmi.dgc.VMID;
-import java.rmi.server.UID;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
@@ -57,10 +55,8 @@
 import org.jboss.logging.Logger;
 import org.jboss.naming.NonSerializableFactory;
 import org.jboss.system.ServiceMBeanSupport;
-import org.jboss.system.server.ServerConfigUtil;
 import org.jgroups.Channel;
 import org.jgroups.ChannelFactory;
-import org.jgroups.Event;
 import org.jgroups.ExtendedMessageListener;
 //import org.jgroups.JChannel;
 import org.jgroups.ExtendedMembershipListener;
@@ -201,7 +197,7 @@
     * An exception occuring upon fetch serviceState.
     */
    protected Exception setStateException;
-   private final Object stateLock = new Object();
+   private final Object channelLock = new Object();
    private final MessageListenerAdapter messageListener = new MessageListenerAdapter();
 
    // Static --------------------------------------------------------
@@ -297,45 +293,21 @@
       dispatcher.setRequestMarshaller(new RequestMarshallerImpl());
       dispatcher.setResponseMarshaller(new ResponseMarshallerImpl());
       
-      // Store our uniqueId in the channel
-      configureUniqueId();
-      
       channel.connect(getPartitionName());
       
       try
       {
-         // get current JG group properties
-         
+         // get current JG group properties         
          log.debug("get nodeName");
          this.localJGAddress = (IpAddress)channel.getLocalAddress();
          this.me = new ClusterNodeImpl(this.localJGAddress);
          this.nodeName = this.me.getName();
 
-         // FIXME -- just block waiting for viewAccepted!
          log.debug("Get current members");
-         View view = channel.getView();
-         this.jgmembers = (Vector)view.getMembers().clone();
-         this.members = translateAddresses(this.jgmembers); // TRANSLATE
-         log.info("Number of cluster members: " + members.size());
-         for(int m = 0; m > members.size(); m ++)
-         {
-            Object node = members.get(m);
-            log.debug(node);
-         }
-         // Keep a list of other members only for "exclude-self" RPC calls
-         
-         this.jgotherMembers = (Vector)view.getMembers().clone();
-         this.jgotherMembers.remove (channel.getLocalAddress());
-         this.otherMembers = translateAddresses(this.jgotherMembers); // TRANSLATE
-         log.info ("Other members: " + this.otherMembers.size ());
+         waitForView();                
 
-         verifyNodeIsUnique(view.getMembers());
+         verifyNodeIsUnique();
 
-         // Update the initial view id
-         this.currentViewId = view.getVid().getId();
-
-         // We must now synchronize new serviceState transfer subscriber
-         //
          fetchState();
          
          replicantManager.startService();
@@ -349,12 +321,12 @@
          asynchHandler.start();
          
          // Bind ourself in the public JNDI space if configured to do so
-         if (!bindIntoJndi)
-          return;
-         
-         Context ctx = new InitialContext();
-         this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx);
-         log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+         if (bindIntoJndi)
+         {
+            Context ctx = new InitialContext();
+            this.bind("/HAPartition/" + getPartitionName(), this, ClusterPartition.class, ctx);
+            log.debug("Bound in JNDI under /HAPartition/" + getPartitionName());
+         }
       }
       catch (Throwable t)
       {
@@ -390,7 +362,8 @@
 //    add the destroyPartition() step
       try
       {
-         channel.disconnect();
+         if (channel.isConnected())
+            channel.disconnect();
       }
       catch (Exception e)
       {
@@ -457,7 +430,7 @@
       boolean rc = channel.getState(null, getStateTransferTimeout());
       if (rc)
       {
-         synchronized (stateLock)
+         synchronized (channelLock)
          {
             while (!isStateSet)
             {
@@ -466,7 +439,7 @@
 
                try
                {
-                  stateLock.wait();
+                  channelLock.wait();
                }
                catch (InterruptedException iex)
                {
@@ -635,12 +608,11 @@
          setStateException = new Exception(t);
    }
 
-   private void notifyStateTransferCompleted()
+   private void notifyChannelLock()
    {
-      synchronized (stateLock)
-      {
-         // Notify wait that serviceState has been set.
-         stateLock.notifyAll();
+      synchronized (channelLock)
+      {         
+         channelLock.notifyAll();
       }
    }
    
@@ -681,11 +653,9 @@
       try
       {
          // we update the view id
-         //
          this.currentViewId = newView.getVid().getId();
 
          // Keep a list of other members only for "exclude-self" RPC calls
-         //
          this.jgotherMembers = (Vector)newView.getMembers().clone();
          this.jgotherMembers.remove (channel.getLocalAddress());
          this.otherMembers = translateAddresses (this.jgotherMembers); // TRANSLATE!
@@ -699,18 +669,28 @@
 
          Vector newjgMembers = (Vector)newView.getMembers().clone();
          Vector newMembers = translateAddresses(newjgMembers); // TRANSLATE
-         if (this.members == null)
+         this.members = newMembers;
+         this.jgmembers = newjgMembers;
+         
+         if (oldMembers == null)
          {
             // Initial viewAccepted
-            this.members = newMembers;
-            this.jgmembers = newjgMembers;
             log.debug("ViewAccepted: initial members set for partition " + getPartitionName() + ": " +
                      this.currentViewId + " (" + this.members + ")");
+            
+            log.info("Number of cluster members: " + members.size());
+            for(int m = 0; m > members.size(); m ++)
+            {
+               Object node = members.get(m);
+               log.debug(node);
+            }
+            log.info ("Other members: " + this.otherMembers.size ());
+            
+            // Wake up the deployer thread blocking in waitForView
+            notifyChannelLock();
             return;
-         }
-         this.members = newMembers;
-         this.jgmembers = newjgMembers;
-
+         }        
+         
          int difference = 0;
          if (oldMembers == null)
             difference = newMembers.size () - 1;
@@ -757,6 +737,26 @@
       }
    }
 
+   private void waitForView() throws Exception
+   {
+      synchronized (channelLock)
+      {
+         if (this.members == null)
+         {
+            try
+            {
+               channelLock.wait(getMethodCallTimeout());
+            }
+            catch (InterruptedException iex)
+            {
+            }
+            
+            if (this.members == null)
+               throw new IllegalStateException("No view received from Channel");
+         }
+      }
+   }
+
    // HAPartition implementation ----------------------------------------------
    
    public String getNodeName()
@@ -796,9 +796,12 @@
 
    public ClusterNode[] getClusterNodes ()
    {
-      ClusterNode[] nodes = new ClusterNode[this.members.size()];
-      nodes = (ClusterNode[]) this.members.toArray(nodes);
-      return nodes;
+      synchronized (members)
+      {
+         ClusterNode[] nodes = new ClusterNode[this.members.size()];
+         nodes = (ClusterNode[]) this.members.toArray(nodes);
+         return nodes;
+      }
    }
 
    public ClusterNode getClusterNode ()
@@ -1196,24 +1199,32 @@
    
    // Protected -----------------------------------------------------
 
-   protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
+   protected void verifyNodeIsUnique () throws IllegalStateException
    {
-      byte[] localUniqueName = this.localJGAddress.getAdditionalData();
-      if (localUniqueName == null)
+      ClusterNodeImpl matched = null;
+      for (ClusterNode member : getClusterNodes())
       {
-         log.error("No additional information has been found in the JGroups address; " +
-                  "make sure you are running with a correct version of JGroups and that the protocols " +
-                  "you are using support 'additionalData' behaviour.");
-         throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; local node name is missing.");
-      }
-
-      for (int i = 0; i < javaGroupIpAddresses.size(); i++)
-      {
-         IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
-         if (!address.equals(this.localJGAddress))
+         if (member.equals(me))
          {
-            if (localUniqueName.equals(address.getAdditionalData()))
-               throw new Exception ("Local node (" + this.localJGAddress + ") removed from cluster; another node (" + address + ") publicizing the same name was already there.");
+            if (matched == null)
+            {
+               // We of course are in the view, so we expect one match
+               // Just track that we've had one
+               matched = (ClusterNodeImpl) member;
+            }
+            else
+            {
+               // Two nodes in view match us; try to figure out which one isn't us
+               ClusterNodeImpl other = matched;
+               if (other.getOriginalJGAddress().equals(((ClusterNodeImpl)me).getOriginalJGAddress()))
+               {
+                  other = (ClusterNodeImpl) member;
+               }
+               throw new IllegalStateException("Found member " + other + 
+                     " in current view that duplicates us (" + me + "). This" +
+                     " node cannot join partition until duplicate member has" +
+                     " been removed");
+            }
          }
       }
    }
@@ -1460,83 +1471,7 @@
    }
 
    // Protected --------------------------------------------------------------
-
-   protected void configureUniqueId() throws Exception
-   {
-      // We push the independent name in the protocol stack 
-      // before connecting to the cluster
-      boolean pushNodeName = true;
-      String uniqueId = config.getNodeUniqueId();
-      if (uniqueId == null || "".equals(uniqueId)) {
-         
-         // See if additional_data has already been set 
-         // (likely by the JChannelFactory); if yes,
-         // just use that
-         IpAddress ourAddr = (IpAddress) channel.getLocalAddress();
-         if (ourAddr != null)
-         {
-            byte[] additional_data = ourAddr.getAdditionalData();
-            if (additional_data != null)
-            {
-               uniqueId = new String(additional_data);
-               pushNodeName = false;
-            }
-         }
-      }
       
-      if (uniqueId == null || "".equals(uniqueId)) {
-         uniqueId = generateUniqueId();
-      }
-      
-      if (pushNodeName)
-      {
-         java.util.HashMap staticNodeName = new java.util.HashMap();
-         staticNodeName.put("additional_data", uniqueId.getBytes());
-         this.channel.down(new Event(Event.CONFIG, staticNodeName));
-      }
-      
-      config.setNodeUniqueId(uniqueId);
-   }
-   
-   protected String generateUniqueId() throws Exception
-   {
-      // we first try to find a simple meaningful name:
-      // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
-      // 2nd) "local-IP:JMV_GUID" otherwise
-      // 3rd) return a fully GUID-based representation
-      //
-
-      // Before anything we determine the local host IP (and NOT name as this could be
-      // resolved differently by other nodes...)
-
-      // But use the specified node address for multi-homing
-      
-      String hostIP = null;
-      InetAddress address = ServerConfigUtil.fixRemoteAddress(config.getNodeAddress());
-      if (address == null)
-      {
-         log.debug ("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
-         log.debug ("using a full GUID strategy");
-         return new VMID().toString();
-      }
-      else
-      {
-         hostIP = address.getHostAddress();
-      }
-
-      // 1st: is JNDI up and running?
-      int namingPort = config.getNamingServicePort();
-      if (namingPort > 0)
-      {
-         return hostIP + ":" + namingPort;
-      }
-
-      // 2nd: host-GUID strategy
-      //
-      String uid = new UID().toString();
-      return hostIP + ":" + uid;
-   }
-   
    /**
     * Creates an object from a byte buffer
     */
@@ -1655,7 +1590,8 @@
          }
          finally
          {
-            notifyStateTransferCompleted();
+            // Notify waiting thread that serviceState has been set.
+            notifyChannelLock();
          }
       }
 
@@ -1714,7 +1650,8 @@
          }
          finally
          {
-            notifyStateTransferCompleted();
+            // Notify waiting thread that serviceState has been set.
+            notifyChannelLock();
          }
       }
       

Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2008-02-01 15:16:57 UTC (rev 69538)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/JChannelFactory.java	2008-02-01 15:20:29 UTC (rev 69539)
@@ -58,6 +58,7 @@
    private String nodeName;
    private int namingServicePort = -1;
    private int state;
+   private boolean assignLogicalAddresses = true;
 
    /**
     * Overrides the superclass version by generating a unique node id
@@ -85,7 +86,8 @@
 //      Channel channel = super.createMultiplexerChannel(stack_name, id, register_for_state_transfer, substate_id);
       Channel channel = super.createMultiplexerChannel(stack_name, id, false, null);
       
-      setChannelUniqueId(channel);
+      if (assignLogicalAddresses)
+         setChannelUniqueId(channel);
       
       return channel;
    }
@@ -132,7 +134,39 @@
    {
       this.namingServicePort = jndiPort;
    }
+   
+   /**
+    * Gets whether this factory should create a "logical address" (or use
+    * one set via {@link #setNodeName(String)} and assign it to
+    * any newly created <code>Channel</code> as JGroups "additional_data".
+    * 
+    * @see #setAssignLogicalAddresses(boolean)
+    */
+   public boolean getAssignLogicalAddresses()
+   {
+      return assignLogicalAddresses;
+   }
 
+   /**
+    * Sets whether this factory should create a "logical address" (or use
+    * one set via {@link #setNodeName(String)} and assign it to
+    * any newly created <code>Channel</code> as JGroups "additional_data".
+    * <p>
+    * Any such logical address will be used by <code>HAPartition</code>
+    * to assign a name to the <code>ClusterNode</code> object representing 
+    * this node. If a logical address is not set, the <code>ClusterNode</code> 
+    * will use the address and port JGroups is using to receive messages to
+    * create its name.
+    * </p>
+    * <p>
+    * Default is <code>true</code>.
+    * </p>
+    */
+   public void setAssignLogicalAddresses(boolean logicalAddresses)
+   {
+      this.assignLogicalAddresses = logicalAddresses;
+   }
+
    @Override
    public void create() throws Exception
    {




More information about the jboss-cvs-commits mailing list