[jboss-cvs] JBossCache/src/org/jboss/cache/buddyreplication ...

Manik Surtani msurtani at jboss.com
Sat Dec 9 10:14:46 EST 2006


  User: msurtani
  Date: 06/12/09 10:14:46

  Modified:    src/org/jboss/cache/buddyreplication  Tag:
                        Branch_JBossCache_1_4_0 BuddyManager.java
  Log:
  Refactored for more correct behaviour
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.33.2.8  +848 -814  JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BuddyManager.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/buddyreplication/BuddyManager.java,v
  retrieving revision 1.33.2.7
  retrieving revision 1.33.2.8
  diff -u -b -r1.33.2.7 -r1.33.2.8
  --- BuddyManager.java	10 Nov 2006 20:30:24 -0000	1.33.2.7
  +++ BuddyManager.java	9 Dec 2006 15:14:46 -0000	1.33.2.8
  @@ -32,10 +32,12 @@
   import java.util.ArrayList;
   import java.util.Arrays;
   import java.util.HashMap;
  +import java.util.HashSet;
   import java.util.Iterator;
   import java.util.List;
   import java.util.Map;
   import java.util.Properties;
  +import java.util.Set;
   import java.util.Vector;
   
   /**
  @@ -72,6 +74,10 @@
        */
       Map buddyPool;
       /**
  +    * The nullBuddyPool is a set of addresses that have not specified buddy pools.
  +    */
  +   Set nullBuddyPool;
  +   /**
        * Name of the buddy pool for current instance.  May be null if buddy pooling is not used.
        */
       String buddyPoolName;
  @@ -86,7 +92,6 @@
        * Needs to deal with concurrent access - concurrent assignTo/removeFrom buddy grp
        */
       Map buddyGroupsIParticipateIn = new ConcurrentReaderHashMap();
  -//    Map buddyGroupsIParticipateIn = new HashMap();
   
       /**
        * Queue to deal with queued up view change requests - which are handled asynchronously
  @@ -119,7 +124,11 @@
        * Flag to prevent us receiving and processing remote calls before we've started
        */
       private boolean initialised = false;
  -//    private Latch initLatch = new Latch();
  +
  +   /**
  +    * Lock to synchronise on to ensure buddy pool info is received before buddies are assigned to groups.
  +    */
  +   private final Object poolInfoNotifierLock = new Object();
   
       public BuddyManager(Element element)
       {
  @@ -138,16 +147,17 @@
           }
           finally
           {
  -            if (log.isDebugEnabled()) log.debug("Using buddy communication timeout of " + buddyCommunicationTimeout + " millis");
  +         if (log.isDebugEnabled())
  +            log.debug("Using buddy communication timeout of " + buddyCommunicationTimeout + " millis");
           }
           buddyPoolName = XmlHelper.readStringContents(element, "buddyPoolName");
           if (buddyPoolName != null && buddyPoolName.equals("")) buddyPoolName = null;
   
           // initialise buddy pool map
  -        if (buddyPoolName != null) buddyPool = new ConcurrentReaderHashMap();
  +      buddyPool = new ConcurrentReaderHashMap();
  +      nullBuddyPool = new HashSet();
   
           // now read the buddy locator details and create accordingly.
  -
           String buddyLocatorClass = null;
           Properties buddyLocatorProperties = null;
           try
  @@ -206,21 +216,23 @@
   
       public void init(TreeCache cache) throws Exception
       {
  -        log.debug("Starting buddy manager");
           this.cache = cache;
  +      final Object localAddress = cache.getLocalAddress();
           buddyGroup = new BuddyGroup();
  -        buddyGroup.setDataOwner((IpAddress) cache.getLocalAddress());
  -        buddyGroup.setGroupName(getGroupNameFromAddress(cache.getLocalAddress()));
  +      buddyGroup.setDataOwner((IpAddress) localAddress);
  +      buddyGroup.setGroupName(getGroupNameFromAddress(localAddress));
  +      log.debug("Starting buddy manager for data owner " + buddyGroup.getDataOwner());
  +
   
           if (buddyPoolName != null)
           {
               buddyPool.put(buddyGroup.getDataOwner(), buddyPoolName);
           }
   
  +      broadcastBuddyPoolMembership();
  +
           // allow waiting threads to process.
           initialised = true;
  -        broadcastBuddyPoolMembership();
  -//        initLatch.release();
   
           // register a TreeCache Listener to reassign buddies as and when view changes occur
   
  @@ -264,19 +276,11 @@
               {
                   Vector newMembers = newView.getMembers();
   
  -                // the whole 'oldMembers' concept is only used for buddy pool announcements.
  -                if (buddyPoolName == null)
  -                {
  -                   enqueueViewChange(null, newMembers);
  -                }
  -                else
  -                {
                       enqueueViewChange(oldMembers == null ? null : new Vector(oldMembers), new Vector(newMembers));
                       if (oldMembers == null) oldMembers = new Vector();
                       oldMembers.clear();
                       oldMembers.addAll(newMembers);
                   }
  -            }
           });
   
           // assign buddies based on what we know now
  @@ -389,9 +393,19 @@
       {
           if (log.isDebugEnabled())
           {
  -            log.debug("Received announcement that cache instance " + address + " is in buddy pool " + poolName);
  +         log.debug(buddyGroup.getDataOwner() + ": received announcement that cache instance " + address + " is in buddy pool " + poolName);
  +      }
  +      if (poolName != null)
  +         buddyPool.put(address, poolName);
  +      else
  +         nullBuddyPool.add(address);
  +
  +      // notify any waiting view change threads that buddy pool info has been received.
  +      synchronized (poolInfoNotifierLock)
  +      {
  +         log.trace("Notifying any waiting view change threads that we have received buddy pool info.");
  +         poolInfoNotifierLock.notifyAll();
           }
  -        if (poolName != null) buddyPool.put(address, poolName);
       }
   
       /**
  @@ -400,7 +414,6 @@
        */
       public void handleRemoveFromBuddyGroup(String groupName) throws BuddyNotInitException
       {
  -        //waitForInit();
          if (!initialised) throw new BuddyNotInitException("Not yet initialised");
           if (log.isInfoEnabled()) log.info("Removing self from buddy group " + groupName);
           buddyGroupsIParticipateIn.remove(groupName);
  @@ -427,6 +440,8 @@
        */
       public void handleAssignToBuddyGroup(BuddyGroup newGroup, Map state) throws Exception
       {
  +      if (log.isTraceEnabled())
  +         log.trace("Handling assign to buddy grp.  Sender: " + newGroup.getGroupName() + "; My instance: " + buddyGroup.getDataOwner());
          // if we haven't initialised, throw an exception.
          if (!initialised) throw new BuddyNotInitException("Not yet initialised");
   
  @@ -442,7 +457,7 @@
              marshaller = cache.getMarshaller();
           }
   
  -        for (Iterator it = state.entrySet().iterator(); it.hasNext(); )
  +      for (Iterator it = state.entrySet().iterator(); it.hasNext();)
           {
              Map.Entry entry = (Map.Entry) it.next();
              Fqn fqn = (Fqn) entry.getKey();
  @@ -510,14 +525,12 @@
       {
           return transformFqns(call, call.getMethodId() != MethodDeclarations.dataGravitationCleanupMethod_id);
       }
  +
       public JBCMethodCall transformFqns(JBCMethodCall call, boolean transformForCurrentCall)
       {
           if (call != null && call.getArgs() != null)
           {
               JBCMethodCall call2 = new JBCMethodCall(call.getMethod(), (Object[]) call.getArgs().clone(), call.getMethodId());
  -//            call2.setId(call.getId());
  -//            call2.setMethod(call.getMethod());
  -//            call2.setArgs((Object[]) call.getArgs().clone());
               handleArgs(call2.getArgs(), transformForCurrentCall);
               return call2;
           }
  @@ -547,7 +560,7 @@
        {
            try
            {
  -            makeRemoteCall(buddies, replicateCall);
  +            makeRemoteCall(buddies, replicateCall, true);
               break;
            }
            catch (Exception e)
  @@ -576,12 +589,6 @@
   
       private void addBuddies(List buddies) throws Exception
       {
  -        // this check is redundant - if buddies is empty this method will not be called. - Manik
  -
  -//       if (buddies.size() == 0)
  -//          return;
  -
  -
           if (log.isInfoEnabled())
           {
               log.info("Assigning new buddies to buddy group [" + buddyGroup.getGroupName() + "].  New buddies are " + buddies);
  @@ -638,7 +645,7 @@
          {
              try
              {
  -              makeRemoteCall(buddies, replicateCall);
  +            makeRemoteCall(buddies, replicateCall, true);
                 break;
              }
              catch (Exception e)
  @@ -663,7 +670,8 @@
          }
   
   
  -        log.trace("addToGroup notification complete");
  +      if (log.isTraceEnabled())
  +         log.trace("addToGroup notification complete (data owner " + buddyGroup.getDataOwner() + ")");
       }
   
       private byte[] acquireState(Fqn fqn) throws Exception
  @@ -746,7 +754,7 @@
   
           try
           {
  -            makeRemoteCall(recipients, replicateCall);
  +         makeRemoteCall(recipients, replicateCall, true);
           }
           catch (Exception e)
           {
  @@ -754,7 +762,7 @@
           }
       }
   
  -    private void makeRemoteCall(List recipients, MethodCall call) throws Exception
  +   private void makeRemoteCall(List recipients, MethodCall call, boolean sync) throws Exception
       {
           // remove non-members from dest list
           if (recipients != null)
  @@ -771,7 +779,7 @@
               }
           }
   
  -        cache.callRemoteMethods(recipients, call, true, true, buddyCommunicationTimeout);
  +      cache.callRemoteMethods(recipients, call, sync, true, buddyCommunicationTimeout);
       }
   
   
  @@ -812,6 +820,7 @@
   
       /**
        * Assumes the backup Fqn if the current instance is the data owner
  +    *
        * @param originalFqn
        * @return backup fqn
        */
  @@ -892,24 +901,49 @@
   
           private void handleEnqueuedViewChange() throws Exception
           {
  -            log.trace("Handling queued view change");
  +         log.trace("Waiting for enqueued view change events");
               List[] members = (List[]) queue.take(); // 2 element array - 0 - oldMembers, 1 - newMembers
   
  +         // make sure new buddies have broadcast their pool memberships.
  +         while (!buddyPoolInfoAvailable(members[1]))
  +         {
  +            synchronized (poolInfoNotifierLock)
  +            {
  +               log.trace("Not received necessary buddy pool info for all new members yet; waiting on poolInfoNotifierLock.");
  +               poolInfoNotifierLock.wait();
  +            }
  +         }
  +
  +         log.trace("Broadcasting pool membership details, triggered by view change.");
  +         if (members[0] == null)
  +            broadcastBuddyPoolMembership();
  +         else
  +         {
  +            List delta = new ArrayList();
  +            delta.addAll(members[1]);
  +            delta.removeAll(members[0]);
  +            broadcastBuddyPoolMembership(delta);
  +         }
  +
               // always refresh buddy list.
               reassignBuddies(members[1]);
  +      }
   
  -            if (buddyPoolName != null)
  +      private boolean buddyPoolInfoAvailable(List newMembers)
               {
  -                log.trace("Broadcasting pool membership details, triggered by view change.");
  -                broadcastBuddyPoolMembership();
  -//                if (members[0] == null)
  -//                    broadcastBuddyPoolMembership();
  -//                else
  -//                {
  -//                    members[1].removeAll(members[0]);
  -//                    broadcastBuddyPoolMembership(members[1]);
  -//                }
  +         boolean infoReceived = true;
  +         Iterator i = newMembers.iterator();
  +         while (i.hasNext())
  +         {
  +            Object address = i.next();
  +            infoReceived = infoReceived && (address.equals(cache.getLocalAddress()) || buddyPool.keySet().contains(address) || nullBuddyPool.contains(address));
               }
  +
  +         log.trace(buddyGroup.getDataOwner() + " received buddy pool info for new members " + newMembers + "?  " + infoReceived);
  +
  +         return infoReceived;
           }
  +
       }
   }
  +
  
  
  



More information about the jboss-cvs-commits mailing list