[jboss-cvs] JBossCache/src/org/jboss/cache/buddyreplication ...

Manik Surtani manik at jboss.org
Sun May 27 11:21:55 EDT 2007


  User: msurtani
  Date: 07/05/27 11:21:55

  Modified:    src/org/jboss/cache/buddyreplication  BuddyManager.java
  Log:
  Fixed more BR brittleness
  
  Revision  Changes    Path
  1.76      +94 -93    JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BuddyManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java,v
  retrieving revision 1.75
  retrieving revision 1.76
  diff -u -b -r1.75 -r1.76
  --- BuddyManager.java	25 May 2007 16:34:51 -0000	1.75
  +++ BuddyManager.java	27 May 2007 15:21:55 -0000	1.76
  @@ -25,6 +25,7 @@
   import org.jboss.util.stream.MarshalledValueInputStream;
   import org.jboss.util.stream.MarshalledValueOutputStream;
   import org.jgroups.Address;
  +import org.jgroups.Channel;
   import org.jgroups.View;
   import org.jgroups.util.Util;
   
  @@ -34,6 +35,7 @@
   import java.util.Collection;
   import java.util.HashMap;
   import java.util.Iterator;
  +import java.util.LinkedList;
   import java.util.List;
   import java.util.Map;
   import java.util.Set;
  @@ -54,10 +56,6 @@
      private static Log log = LogFactory.getLog(BuddyManager.class);
   
      /**
  -    * A place to drain the view change queue.
  -    */
  -   private Collection<MembershipChange> sink = new NullCollection<MembershipChange>();
  -   /**
       * Configuration object.
       */
      final BuddyReplicationConfig config;
  @@ -124,8 +122,10 @@
       */
      private final Object poolInfoNotifierLock = new Object();
   
  -
      private CountDownLatch initialisationLatch = new CountDownLatch(1);
  +   // a dummy MembershipChange - a poison-pill to be placed on the membership change queue to notify async handler
  +   // threads to exit gracefully when the BuddyManager has been stopped.
  +   private static final MembershipChange STOP_NOTIFIER = new MembershipChange(null, null);
   
      public BuddyManager(BuddyReplicationConfig config)
      {
  @@ -186,6 +186,22 @@
         return s.replace(':', '_');
      }
   
  +   /**
  +    * Stops the buddy manager and the related async thread.
  +    */
  +   public void stop()
  +   {
  +      try
  +      {
  +         queue.clear();
  +         queue.put(STOP_NOTIFIER);
  +      }
  +      catch (InterruptedException ie)
  +      {
  +         // do nothing - we're stopping anyway
  +      }
  +   }
  +
      public void init(CacheImpl cache) throws CacheException
      {
         log.debug("Starting buddy manager");
  @@ -265,6 +281,11 @@
            this.oldMembers = oldMembers;
            this.newMembers = newMembers;
         }
  +
  +      public String toString()
  +      {
  +         return "MembershipChange: Old members = " + oldMembers + " New members = " + newMembers;
  +      }
      }
   
      private synchronized void enqueueViewChange(List<Address> oldMembers, List<Address> newMembers)
  @@ -272,9 +293,14 @@
         // put this on a queue
         try
         {
  +         if (queue.peek() != STOP_NOTIFIER)
  +         {
            //first empty the queue.  All queued up view changes that have not been processed yet are now obsolete.
  -         queue.drainTo(sink);
  -         queue.put(new MembershipChange(oldMembers, newMembers));
  +            queue.clear();
  +            MembershipChange mc = new MembershipChange(oldMembers, newMembers);
  +            if (log.isTraceEnabled()) log.trace("Enqueueing " + mc + " for async processing");
  +            queue.put(mc);
  +         }
         }
         catch (InterruptedException e)
         {
  @@ -289,8 +315,10 @@
       * have been added.  Makes use of the BuddyLocator and then
       * makes RPC calls to remote nodes to assign/remove buddies.
       */
  -   private void reassignBuddies(List<Address> membership) throws CacheException
  +   private void reassignBuddies(List<Address> members) throws CacheException
      {
  +      List<Address> membership = new ArrayList<Address>(members); // defensive copy
  +
         if (log.isDebugEnabled())
         {
            log.debug("Data owner address " + cache.getLocalAddress());
  @@ -298,6 +326,13 @@
         }
         // some of my buddies have died!
         List<Address> newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
  +      List<Address> unreachableBuddies;
  +      if (!(unreachableBuddies = checkBuddyStatus(newBuddies)).isEmpty())
  +      {
  +         // some of the new buddies are unreachable.  Ditch them, try the algo again.
  +         membership.removeAll(unreachableBuddies);
  +         newBuddies = buddyLocator.locateBuddies(buddyPool, membership, buddyGroup.getDataOwner());
  +      }
         List<Address> uninitialisedBuddies = new ArrayList<Address>();
         List<Address> originalBuddies = buddyGroup.getBuddies();
   
  @@ -350,6 +385,21 @@
   
      }
   
  +   /**
  +    * Tests whether all members in the list are valid JGroups members.
  +    *
  +    * @param members
  +    * @return
  +    */
  +   private List<Address> checkBuddyStatus(List<Address> members)
  +   {
  +      Channel ch = cache.getConfiguration().getRuntimeConfig().getChannel();
  +      View currentView = ch.getView();
  +      List<Address> deadBuddies = new LinkedList<Address>();
  +      for (Address a : members) if (!currentView.containsMember(a)) deadBuddies.add(a);
  +      return deadBuddies;
  +   }
  +
      // -------------- methods to be called by the tree cache  --------------------
   
      /**
  @@ -402,24 +452,23 @@
         buddyGroupsIParticipateIn.remove(groupName);
   
         // remove backup data for this group
  -      // TODO: should we REALLY be doing this?  How do we know that this node didn't die and by removing this data we're deleting bkups?
  -//      if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
  -//
  -//      try
  -//      {
  -//         // should be a LOCAL call.
  -//         // cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
  -//          
  -//         // cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName));
  -//      }
  -//      catch (CacheException e)
  -//      {
  -//         log.error("Unable to remove backup data for group " + groupName, e);
  -//      }
  -//      finally
  -//      {
  -//         cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
  -//      }
  +      if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
  +
  +      try
  +      {
  +         // should be a LOCAL call.
  +         cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
  +
  +         cache.remove(new Fqn(BUDDY_BACKUP_SUBTREE_FQN, groupName));
  +      }
  +      catch (CacheException e)
  +      {
  +         log.error("Unable to remove backup data for group " + groupName, e);
  +      }
  +      finally
  +      {
  +         cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
  +      }
      }
   
      /**
  @@ -943,6 +992,7 @@
      private class AsyncViewChangeHandlerThread implements Runnable
      {
         private Thread t;
  +      private boolean isRunning = true;
   
         public void start()
         {
  @@ -966,7 +1016,7 @@
            {
               log.debug("Caught InterruptedException", e);
            }
  -         while (!Thread.interrupted())
  +         while (!Thread.interrupted() && isRunning)
            {
               try
               {
  @@ -989,6 +1039,20 @@
         {
            log.trace("Waiting for enqueued view change events");
            MembershipChange members = queue.take();
  +         if (members == STOP_NOTIFIER)
  +         {
  +            // time to go home
  +            isRunning = false;
  +            return;
  +         }
  +
  +         // there is a strange case where JGroups issues view changes and just includes self in new views, and then
  +         // quickly corrects it.  Happens intermittently on some unit tests.  If this is such a case, please ignore.
  +         if (members.newMembers.size() == 1 && members.newMembers.get(0).equals(cache.getLocalAddress()))
  +         {
  +            log.info("Ignoring membership change event since it only contains self.");
  +            return;
  +         }
   
            broadcastPoolMembership(members);
   
  @@ -1050,73 +1114,10 @@
   
            return infoReceived;
         }
  -   }
  -
  -   private class NullCollection<E> implements Collection<E>
  -   {
  -
  -      public int size()
  -      {
  -         return 0;  //Does nothing
  -      }
  -
  -      public boolean isEmpty()
  -      {
  -         return true;
  -      }
  -
  -      public boolean contains(Object o)
  -      {
  -         return false;
  -      }
  -
  -      public Iterator<E> iterator()
  -      {
  -         return null;
  -      }
  -
  -      public Object[] toArray()
  -      {
  -         return new Object[0];
  -      }
  -
  -      public <T> T[] toArray(T[] a)
  -      {
  -         return null;
  -      }
  -
  -      public boolean add(E o)
  -      {
  -         return true;
  -      }
  -
  -      public boolean remove(Object o)
  -      {
  -         return true;
  -      }
  -
  -      public boolean containsAll(Collection<?> c)
  -      {
  -         return false;
  -      }
  -
  -      public boolean addAll(Collection<? extends E> c)
  -      {
  -         return true;
  -      }
  -
  -      public boolean removeAll(Collection<?> c)
  -      {
  -         return true;
  -      }
  -
  -      public boolean retainAll(Collection<?> c)
  -      {
  -         return true;
  -      }
   
  -      public void clear()
  +      public void stop()
         {
  +         if (t != null) t.interrupt();
         }
      }
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list