[jboss-cvs] JBossCache/src/org/jboss/cache/buddyreplication ...

Manik Surtani msurtani at jboss.com
Thu Dec 21 11:19:33 EST 2006


  User: msurtani
  Date: 06/12/21 11:19:33

  Modified:    src/org/jboss/cache/buddyreplication     BuddyLocator.java
                        BuddyManager.java BuddyGroup.java
                        NextMemberBuddyLocator.java
  Log:
  refactored
  
  Revision  Changes    Path
  1.5       +9 -8      JBossCache/src/org/jboss/cache/buddyreplication/BuddyLocator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BuddyLocator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyLocator.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -b -r1.4 -r1.5
  --- BuddyLocator.java	23 Oct 2006 05:46:39 -0000	1.4
  +++ BuddyLocator.java	21 Dec 2006 16:19:33 -0000	1.5
  @@ -8,7 +8,7 @@
   
   import org.jboss.cache.config.BuddyReplicationConfig;
   import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
  -import org.jgroups.stack.IpAddress;
  +import org.jgroups.Address;
   
   import java.util.List;
   import java.util.Map;
  @@ -36,6 +36,7 @@
      
      /**
       * Initialize this <code>BuddyLocator</code>.
  +    *
       * @param config configuration for this <code>BuddyLocator</code>. May be
       *               <code>null</code>, in which case the implementation should
       *               use its default configuration.
  @@ -46,17 +47,17 @@
       * Choose a set of buddies for the given node.  Invoked when a change in
       * cluster membership is detected.
       *
  -    * @param buddyPoolMap      Map<IpAddress, String> mapping nodes in the cluster to
  +    * @param buddyPoolMap      Map<Address, String> mapping nodes in the cluster to
       *                          the "buddy pool" they have identified themselves as
       *                          belonging too.  A BuddyLocator implementation can use
       *                          this information to preferentially assign buddies from
       *                          the same buddy pool as <code>dataOwner</code>.  May be
       *                          <code>null</code> if buddy pools aren't configured.
  -    * @param currentMembership List<IpAddress> of the current cluster members
  -    * @param dataOwner         IpAddress of the node for which buddies should be selected
  -    * @return List<IpAddress> of the nodes that should serve as buddies for
  +    * @param currentMembership List<Address> of the current cluster members
  +    * @param dataOwner         Address of the node for which buddies should be selected
  +    * @return List<Address> of the nodes that should serve as buddies for
       *         <code>dataOwner</code>. Will not be <code>null</code>, may
       *         be empty.
       */
  -   public List<IpAddress> locateBuddies(Map<IpAddress, String> buddyPoolMap, List<IpAddress> currentMembership, IpAddress dataOwner);
  +   public List<Address> locateBuddies(Map<Address, String> buddyPoolMap, List<Address> currentMembership, Address dataOwner);
   }
  
  
  
  1.52      +111 -57   JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BuddyManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java,v
  retrieving revision 1.51
  retrieving revision 1.52
  diff -u -b -r1.51 -r1.52
  --- BuddyManager.java	21 Dec 2006 15:14:16 -0000	1.51
  +++ BuddyManager.java	21 Dec 2006 16:19:33 -0000	1.52
  @@ -24,21 +24,22 @@
   import org.jboss.util.stream.MarshalledValueInputStream;
   import org.jgroups.Address;
   import org.jgroups.View;
  -import org.jgroups.stack.IpAddress;
   
   import java.io.ByteArrayInputStream;
   import java.util.ArrayList;
   import java.util.Arrays;
   import java.util.Collection;
   import java.util.HashMap;
  +import java.util.HashSet;
   import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
  +import java.util.Set;
   import java.util.Vector;
  -import java.util.concurrent.atomic.AtomicInteger;
   import java.util.concurrent.BlockingQueue;
   import java.util.concurrent.ConcurrentHashMap;
   import java.util.concurrent.LinkedBlockingQueue;
  +import java.util.concurrent.atomic.AtomicInteger;
   
   /**
    * Class that manages buddy replication groups.
  @@ -72,19 +73,24 @@
      /**
       * Map of buddy pools received from broadcasts
       */
  -   Map<IpAddress, String> buddyPool;
  +   Map<Address, String> buddyPool = new ConcurrentHashMap<Address, String>();
  +
  +   /**
  +    * The nullBuddyPool is a set of addresses that have not specified buddy pools.
  +    */
  +   final Set<Address> nullBuddyPool = new HashSet<Address>();
   
      /**
       * Map of bddy groups the current instance participates in as a backup node.
       * Keyed on String group name, values are BuddyGroup objects.
       * Needs to deal with concurrent access - concurrent assignTo/removeFrom buddy grp
       */
  -   Map<String, BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap();
  +   Map<String, BuddyGroup> buddyGroupsIParticipateIn = new ConcurrentHashMap<String, BuddyGroup>();
   
      /**
       * Queue to deal with queued up view change requests - which are handled asynchronously
       */
  -   private final BlockingQueue<MembershipChange> queue = new LinkedBlockingQueue();
  +   private final BlockingQueue<MembershipChange> queue = new LinkedBlockingQueue<MembershipChange>();
   
      /**
       * Async thread that handles items on the view change queue
  @@ -101,11 +107,17 @@
      /**
       * number of times to retry communicating with a selected buddy if the buddy has not been initialised.
       */
  -   private static int UNINIT_BUDDIES_RETRIES = 3;
  +   private static int UNINIT_BUDDIES_RETRIES = 5;
      /**
       * wait time between retries
       */
  -   private static final long UNINIT_BUDDIES_RETRY_NAPTIME = 500;
  +   private static final long[] UNINIT_BUDDIES_RETRY_NAPTIME = {500, 1000, 1500, 2000, 2500};
  +
  +   /**
  +    * Lock to synchronise on to ensure buddy pool info is received before buddies are assigned to groups.
  +    */
  +   private final Object poolInfoNotifierLock = new Object();
  +
   
      /**
       * Flag to prevent us receiving and processing remote calls before we've started
  @@ -117,9 +129,6 @@
      {
         this.config = config;
   
  -      if (config.getBuddyPoolName() != null)
  -         buddyPool = new ConcurrentHashMap();
  -
         BuddyLocatorConfig blc = config.getBuddyLocatorConfig();
         try
         {
  @@ -178,7 +187,7 @@
         log.debug("Starting buddy manager");
         this.cache = cache;
         buddyGroup = new BuddyGroup();
  -      buddyGroup.setDataOwner((IpAddress) cache.getLocalAddress());
  +      buddyGroup.setDataOwner((Address) cache.getLocalAddress());
         buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress()));
   
         if (config.getBuddyPoolName() != null)
  @@ -186,20 +195,19 @@
            buddyPool.put(buddyGroup.getDataOwner(), config.getBuddyPoolName());
         }
   
  +      broadcastBuddyPoolMembership();
  +
         // allow waiting threads to process.
         initialised = true;
  -      broadcastBuddyPoolMembership();
  -//        initLatch.release();
   
         // register a TreeCache Listener to reassign buddies as and when view changes occur
  -
         cache.getNotifier().addCacheListener(new AbstractCacheListener()
         {
  -         private Vector oldMembers;
  +         private Vector<Address> oldMembers;
   
            public void viewChange(View newView)
            {
  -            Vector newMembers = newView.getMembers();
  +            Vector<Address> newMembers = newView.getMembers();
   
               // the whole 'oldMembers' concept is only used for buddy pool announcements.
               if (config.getBuddyPoolName() == null)
  @@ -208,8 +216,8 @@
               }
               else
               {
  -               enqueueViewChange(oldMembers == null ? null : new Vector(oldMembers), new Vector(newMembers));
  -               if (oldMembers == null) oldMembers = new Vector();
  +               enqueueViewChange(oldMembers == null ? null : new Vector<Address>(oldMembers), new Vector<Address>(newMembers));
  +               if (oldMembers == null) oldMembers = new Vector<Address>();
                  oldMembers.clear();
                  oldMembers.addAll(newMembers);
               }
  @@ -243,17 +251,19 @@
   
      // -------------- methods to be called by the tree cache listener --------------------
   
  -   static class MembershipChange {
  -      List oldMembers;
  -      List newMembers;
  -      public MembershipChange(List oldMembers, List newMembers)
  +   static class MembershipChange
  +   {
  +      List<Address> oldMembers;
  +      List<Address> newMembers;
  +
  +      public MembershipChange(List<Address> oldMembers, List<Address> newMembers)
         {
            this.oldMembers = oldMembers;
            this.newMembers = newMembers;
         }
      }
      
  -   private void enqueueViewChange(List oldMembers, List newMembers)
  +   private void enqueueViewChange(List<Address> oldMembers, List<Address> newMembers)
      {
         // put this on a queue
         try
  @@ -273,7 +283,7 @@
       * have been added.  Makes use of the BuddyLocator and then
       * makes RPC calls to remote nodes to assign/remove buddies.
       */
  -   private void reassignBuddies(List membership) throws Exception
  +   private void reassignBuddies(List<Address> membership) throws Exception
      {
         if (log.isDebugEnabled())
         {
  @@ -281,24 +291,20 @@
            log.debug("Entering updateGroup.  Current group: " + buddyGroup + ".  Current View membership: " + membership);
         }
         // some of my buddies have died!
  -      List newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
  -      List uninitialisedBuddies = new ArrayList();
  -      Iterator newBuddiesIt = newBuddies.iterator();
  -      while (newBuddiesIt.hasNext())
  +      List<Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
  +      List<Address> uninitialisedBuddies = new ArrayList<Address>();
  +      for (Address newBuddy : newBuddies)
         {
  -         Object newBuddy = newBuddiesIt.next();
            if (!buddyGroup.buddies.contains(newBuddy))
            {
               uninitialisedBuddies.add(newBuddy);
            }
         }
   
  -      List obsoleteBuddies = new ArrayList();
  +      List<Address> obsoleteBuddies = new ArrayList<Address>();
         // find obsolete buddies
  -      Iterator originalBuddies = buddyGroup.buddies.iterator();
  -      while (originalBuddies.hasNext())
  +      for (Address origBuddy : buddyGroup.buddies)
         {
  -         Object origBuddy = originalBuddies.next();
            if (!newBuddies.contains(origBuddy))
            {
               obsoleteBuddies.add(origBuddy);
  @@ -332,13 +338,28 @@
       * Called by TreeCache._remoteAnnounceBuddyPoolName(Address address, String buddyPoolName)
       * when a view change occurs and caches need to inform the cluster of which buddy pool it is in.
       */
  -   public void handlePoolNameBroadcast(IpAddress address, String poolName)
  +   public void handlePoolNameBroadcast(Address address, String poolName)
      {
         if (log.isDebugEnabled())
         {
  -         log.debug("Received announcement that cache instance " + address + " is in buddy pool " + poolName);
  +         log.debug(buddyGroup.getDataOwner() + ": received announcement that cache instance " + address + " is in buddy pool " + poolName);
  +      }
  +      if (poolName != null)
  +         buddyPool.put(address, poolName);
  +      else
  +      {
  +         synchronized (nullBuddyPool)
  +         {
  +            if (!nullBuddyPool.contains(address)) nullBuddyPool.add(address);
  +         }
  +      }
  +
  +      // notify any waiting view change threads that buddy pool info has been received.
  +      synchronized (poolInfoNotifierLock)
  +      {
  +         log.trace("Notifying any waiting view change threads that we have received buddy pool info.");
  +         poolInfoNotifierLock.notifyAll();
         }
  -      if (poolName != null) buddyPool.put(address, poolName);
      }
   
      /**
  @@ -400,14 +421,14 @@
               ClassLoader cl = (marshaller == null) ? null : marshaller.getClassLoader(fqnS);
               Fqn integrationRoot = new Fqn(integrationBase, fqn);
               
  -            byte [] stateBuffer = (byte[]) entry.getValue();
  +            byte[] stateBuffer = (byte[]) entry.getValue();
               MarshalledValueInputStream in = null;
               try
               {
                  ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
                  in = new MarshalledValueInputStream(bais);
                  boolean hasState = in.readBoolean();
  -               if(hasState)
  +               if (hasState)
                  {
                     stateMgr.setState(in, integrationRoot, cl);
                  }
  @@ -427,7 +448,7 @@
   
      public static Fqn getBackupFqn(Object buddyGroupName, Fqn origFqn)
      {
  -      List elements = new ArrayList();
  +      List<Object> elements = new ArrayList<Object>();
         elements.add(BUDDY_BACKUP_SUBTREE);
         elements.add(buddyGroupName);
         elements.addAll(origFqn.peekElements());
  @@ -442,7 +463,7 @@
            return origFqn;
         }
   
  -      List elements = new ArrayList();
  +      List<Object> elements = new ArrayList<Object>();
         elements.add(BUDDY_BACKUP_SUBTREE);
         elements.add(buddyGroupRoot.get(1));
         elements.addAll(origFqn.peekElements());
  @@ -504,6 +525,7 @@
         MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
   
         int attemptsLeft = UNINIT_BUDDIES_RETRIES;
  +      int currentAttempt = 0;
   
         while (attemptsLeft-- > 0)
         {
  @@ -519,7 +541,7 @@
                  if (attemptsLeft > 0)
                  {
                     log.info("One of the buddies have not been initialised.  Will retry after a short nap.");
  -                  Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME);
  +                  Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
                  }
                  else
                  {
  @@ -535,7 +557,7 @@
         log.trace("removeFromGroup notification complete");
      }
   
  -   private void addBuddies(List buddies) throws Exception
  +   private void addBuddies(List<Address> buddies) throws Exception
      {
         // this check is redundant - if buddies is empty this method will not be called. - Manik
   
  @@ -592,6 +614,7 @@
         MethodCall replicateCall = MethodCallFactory.create(MethodDeclarations.replicateMethod, membershipCall);
   
         int attemptsLeft = UNINIT_BUDDIES_RETRIES;
  +      int currentAttempt = 0;
   
         while (attemptsLeft-- > 0)
         {
  @@ -607,7 +630,8 @@
                  if (attemptsLeft > 0)
                  {
                     log.info("One of the buddies have not been initialised.  Will retry after a short nap.");
  -                  Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME);
  +                  Thread.sleep(UNINIT_BUDDIES_RETRY_NAPTIME[currentAttempt++]);
  +
                  }
                  else
                  {
  @@ -690,7 +714,7 @@
         broadcastBuddyPoolMembership(null);
      }
   
  -   private void broadcastBuddyPoolMembership(List recipients)
  +   private void broadcastBuddyPoolMembership(List<Address> recipients)
      {
         // broadcast to other caches
         if (log.isDebugEnabled())
  @@ -849,24 +873,54 @@
   
         private void handleEnqueuedViewChange() throws Exception
         {
  -         log.trace("Handling queued view change");
  +         log.trace("Waiting for enqueued view change events");
            MembershipChange members = queue.take();
   
  -         // always refresh buddy list.
  -         reassignBuddies(members.newMembers);
  -
  -         if (config.getBuddyPoolName() != null)
  +         // make sure new buddies have broadcast their pool memberships.
  +         while (!buddyPoolInfoAvailable(members.newMembers))
  +         {
  +            synchronized (poolInfoNotifierLock)
            {
  +               log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock.");
  +               poolInfoNotifierLock.wait();
  +            }
  +         }
  +
               log.trace("Broadcasting pool membership details, triggered by view change.");
  +         //if (members.oldMembers == null)
               broadcastBuddyPoolMembership();
  -//                if (members[0] == null)
  -//                    broadcastBuddyPoolMembership();
   //                else
   //                {
  -//                    members[1].removeAll(members[0]);
  -//                    broadcastBuddyPoolMembership(members[1]);
  +//            List<Address> delta = new ArrayList<Address>();
  +//            delta.addAll(members.newMembers);
  +//            delta.removeAll(members.oldMembers);
  +//            broadcastBuddyPoolMembership(delta);
   //                }
  +
  +         // always refresh buddy list.
  +         reassignBuddies(members.newMembers);
            }
  +
  +      private boolean buddyPoolInfoAvailable(List<Address> newMembers)
  +      {
  +         boolean infoReceived = true;
  +         for (Address address : newMembers)
  +         {
  +            // make sure no one is concurrently writing to nullBuddyPool.
  +            synchronized (nullBuddyPool)
  +            {
  +//               log.trace("Testing on node " + buddyGroup.getDataOwner() + " for candidate " + address);
  +//               log.trace("Is me? " + address.equals(cache.getLocalAddress()));
  +//               log.trace("is in bP? " + buddyPool.keySet().contains(address));
  +//               log.trace("is in nBP? " + nullBuddyPool.contains(address));
  +               infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address));
  +            }
  +         }
  +
  +         if (log.isTraceEnabled())
  +            log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "?  " + infoReceived);
  +
  +         return infoReceived;
         }
      }
   }
  \ No newline at end of file
  
  
  
  1.9       +7 -8      JBossCache/src/org/jboss/cache/buddyreplication/BuddyGroup.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BuddyGroup.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyGroup.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -b -r1.8 -r1.9
  --- BuddyGroup.java	11 Dec 2006 21:14:34 -0000	1.8
  +++ BuddyGroup.java	21 Dec 2006 16:19:33 -0000	1.9
  @@ -6,12 +6,11 @@
    */
   package org.jboss.cache.buddyreplication;
   
  -import org.jgroups.stack.IpAddress;
  +import org.jgroups.Address;
   
   import java.io.Serializable;
  -import java.util.ArrayList;
   import java.util.List;
  -import java.util.Collections;
  +import java.util.Vector;
   
   /**
    * Value object that represents a buddy group
  @@ -27,12 +26,12 @@
   
      private String groupName;
   
  -   private IpAddress dataOwner;
  +   private Address dataOwner;
   
      /**
       * Vector<Address> - a list of JGroups addresses
       */
  -   List buddies = Collections.synchronizedList(new ArrayList());
  +   List<Address> buddies = new Vector<Address>();//Collections.synchronizedList(new ArrayList());
   
      //    List buddies = new ArrayList();
   
  @@ -46,12 +45,12 @@
         this.groupName = groupName;
      }
   
  -   public IpAddress getDataOwner()
  +   public Address getDataOwner()
      {
         return dataOwner;
      }
   
  -   public void setDataOwner(IpAddress dataOwner)
  +   public void setDataOwner(Address dataOwner)
      {
         this.dataOwner = dataOwner;
      }
  @@ -61,7 +60,7 @@
         return buddies;
      }
   
  -   public void setBuddies(List buddies)
  +   public void setBuddies(List<Address> buddies)
      {
         this.buddies = buddies;
      }
  
  
  
  1.8       +9 -8      JBossCache/src/org/jboss/cache/buddyreplication/NextMemberBuddyLocator.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: NextMemberBuddyLocator.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/NextMemberBuddyLocator.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -b -r1.7 -r1.8
  --- NextMemberBuddyLocator.java	4 Nov 2006 22:03:24 -0000	1.7
  +++ NextMemberBuddyLocator.java	21 Dec 2006 16:19:33 -0000	1.8
  @@ -9,6 +9,7 @@
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
  +import org.jgroups.Address;
   import org.jgroups.stack.IpAddress;
   
   import java.net.InetAddress;
  @@ -61,10 +62,10 @@
         }
      }
   
  -   public List<IpAddress> locateBuddies(Map<IpAddress, String> buddyPoolMap, List<IpAddress> currentMembership, IpAddress dataOwner)
  +   public List<Address> locateBuddies(Map<Address, String> buddyPoolMap, List<Address> currentMembership, Address dataOwner)
      {
         int numBuddiesToFind = Math.min(config.getNumBuddies(), currentMembership.size());
  -      List<IpAddress> buddies = new ArrayList<IpAddress>(numBuddiesToFind);
  +      List<Address> buddies = new ArrayList<Address>(numBuddiesToFind);
   
         // find where we are in the list.
         int dataOwnerSubscript = currentMembership.indexOf(dataOwner);
  @@ -110,7 +111,7 @@
               break;
            }
   
  -         IpAddress candidate = currentMembership.get(subscript);
  +         Address candidate = currentMembership.get(subscript);
            if (
                    !candidate.equals(dataOwner) && // ignore self from selection as buddy
                            !buddies.contains(candidate) && // havent already considered this candidate
  @@ -136,11 +137,11 @@
         return ownerPoolName.equals(candidatePoolName);
      }
   
  -   private boolean isColocated(IpAddress candidate, IpAddress dataOwner)
  +   private boolean isColocated(Address candidate, Address dataOwner)
      {
         // assume they're both IpAddresses??
  -      InetAddress inetC = candidate.getIpAddress();
  -      InetAddress inetD = dataOwner.getIpAddress();
  +      InetAddress inetC = ((IpAddress) candidate).getIpAddress();
  +      InetAddress inetD = ((IpAddress) dataOwner).getIpAddress();
   
         if (inetC.equals(inetD)) return true;
   
  
  
  



More information about the jboss-cvs-commits mailing list